导图社区 rabbitmq思维导图
rabbitmq的基本概念介绍归纳,AMQP常见命令的使用说明,消费者与生产者的运转流程,消息中间件常见问题的解决方法(生产者确认、死信队列、延迟队列、消费者确认、如何保证消息到达broker等)。
编辑于2021-07-15 15:17:28rabbitmq
概念介绍
生产者
生产者会生产消息,会将消息发送到交换器,并设置一些关于消息的属性,比如是否持久化等
消费者
消费者会消费消息
Broker
消息中间件的服务节点
队列
存放消息的地方
交换器
有4种类型交换器
fanout
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
direct
会把消息路由到那些绑定键与路由键完全相同的队列中
topic
该交换器关联的绑定键是使用了通配符(*与#)的
单词
2个 . 之间的内容称为一个单词
通配符
*通配符
可以匹配一个单词
#通配符
可以匹配多个单词
headers
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息中的headers属性进行匹配。
生产者会将消息和路由键发给交换器,根据路由键决定将消息发送到哪个队列
绑定键
通过绑定键将消息队列与交换器绑定起来
路由键
每当发送一个消息时都会带着一个路由键一并发给交换器,交换器根据路由键决定将消息发送给哪个队列
消息
消息包含2部分:消息体和标签
消息体
消息体如果是Java类型对象,通常需要进行序列化操作,类要实现序列化接口
标签
由2部分组成:交换器、路由键;因为只有知道了交换器与路由键才可以将消息路由到指定的队列中
总结
消息不能在队列层级进行广播,即不能将队列的每条消息发送给所有的订阅者,只能轮询消费消息
交换器声明
常用设置
交换器名称
指定交换器类型
交换器是否持久化
设置持久化后,交换器信息会被持久化,服务器重启后交换器信息不会丢失
设置当其他队列或交换器与此交换器解绑时是否自动删除本交换器
删除的前提时,有其他交换器或队列与此交换器绑定了,并且现在所有已绑定的队列或交换器与此交换器进行了解绑,那么根据设置判断是否删除改交换器
是否设置为内置交换器
首先声明一点:交换器不仅可以与队列绑定,还可以与内置交换器进行绑定
生产者生产的消息可以直接到达交换器,但是不能直接到达内置交换器,再通过内置交换器到达队列
其他一些结构化参数,通过hashmap传递
alternate-exchange
指定备胎交换器,value的值即为备胎交换器的名称
队列声明
常用设置
队列名称
是否持久化
是否排他
如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接( Connection) 可见的,同一个连接的不同信道 (Channel)是可以同时访问同一连接创建的排他队列; "首次"是指如果 个连接已经声明了排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
是否自动删除
自动删除的前提
至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
其他一些结构化参数,通过hashmap传递
x-message-ttl
设置消息的过期时间,value为整数,单位是毫秒
x-expires
设置队列的过期时间,value为整数,单位是毫秒
x-max-length
设置队列可以容纳的最大消息数
x-max-length-bytes
设置队列中可以接收消息的大小
x-dead-letter-exchange
如果消息过期或被拒绝,将重新发布到的exchange的可选名称
x-dead-letter-routing-key
当消息为死信时使用的可选替换路由键。如果未设置,则将使用消息的原始路由键
x-max-priority
设置最大优先级
注意事项
生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为"传输"模式,之后才能声明队列。
生产消息注意事项
1.在生产消息时必须指定交换器名与队列名,以及消息如果未路由到队列改如何处理这个消息(丢弃还是返回给生产者),设置消息的投递模式deliveryMode(即消息是否会被持久化),消息的优先级,消息类型content-type,以及设定消息的属性(在类BasicProperties类中的字段)
BasicProperties类

消费消息
2种消费消息模式
推模式
采用Basic.Consume进行消费
在推模式中,可以通过持续订阅的方式来消费消息
常用设置
队列名称
是否自动确认
消费者标签
用来区分多个消费者
是否可以将本地生产者的消息传递给本地消费者
是否排他
设置消费者的其他参数
设置消费者的回调函数
用来处理Rabbitmq推送过来的消息
拉模式
采用Basic.Get进行消费
通过channel.basicGet()方法可以单条获取消息,其返回值时GetResponse
注意事项
Basic.Consume 将信道 (Channel) 置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos的限制.如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费.但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ的性能.如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。
消费端的确认与拒绝
确认
概念
为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除) 。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
当autoAck 参数置为 false ,对于 RabbitMQ 服务端而言 ,队列中的消息分成了两个部分,一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
RabbitMQ 会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
常用设置
是否自动确认消息已消费
拒绝
在消费者接收到消息后,想明确拒绝当前的消息而不是确认
拒绝命令
Basic.Reject
该命令只能一次拒绝一个消息
Basic.Nack
该命令可以一次拒绝批量消息
常用设置
消息编号
消息是否重新存入队列
以便可以发送给下一个消费者
运转流程
生产者流程
1.生产者连接到Rabbitmq broker,建立一个连接(Connect),开启一个信道(Channel)
2.生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
3.生产者声明一个队列,并设置相关属性,比如是否排它、是否持久化、是否自动删除等
4.生产者通过绑定键将交换器和队列绑定起来
5.生产者发送消息至RabbitMQ Broker,其中包括路由键及交换器等信息
6.相应的交换器根据接收到的路由键查找相匹配的队列
7.如果找到,则将从生产者发送过来的消息存入相应的队列
8.如果没有找到,则根据生产者配置的属性选择丢弃还是会退给生产者
9.关闭信道
10.关闭连接
消费者流程
1.消费者连接到RabbitMQ Broker,建立一个连接,开启一个信道
2.消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
3.等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接受消息
4.消费者确认(ack)接收到的消息
5.RabbitMQ从队列中删除相应已经被确认的消息
6.关闭信道
7.关闭连接
总结
1.连接与信道Channel得区别
我们知道无论是生者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是Connection TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道 (Channel) ,每个信道都会被指派一个唯一的ID 。信道是建立在 Connection 之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

2.为什么要引入信道Channel
试想这样一个场景, 个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection ,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用类似 NIO' (Non-blocking 1/0) 的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了 Connection TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节 TCP 连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多 Connection ,将这些信道均摊到这些 Connection 中, 至于这些相关的调优策略需要根据业务自身的实际情况进行调节。
AMQP
概念
AMQP是一个协议,而RabbitMQ是就是AMQP协议的Erlang实现
AMQP协议层级关系
1. Module Layer
位于协议最高层,主要定义一些供客户端调用的命令
2. Session Layer
位于协议中间层,主要负责将客户端的命令发送给服务器
3. Transport Layer
位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等
AMQP协议命令流转过程
生产者流转过程

消费者流转过程

AMQP命令


进阶
消息何去何从
当消息在传递过程中不能到达目的地(目的队列),该怎么办?
1.丢弃消息
当生产消息方法的参数mandatory设置为false时会丢弃消息
2.返回给生产者
1.当生产消息方法的参数mandatory设置为true时会调用Basic.Return命令将消息返回给生产者
疑问
1.如何回去返回给生产者的消息
在信道上添加返回监听器ReturnListener
代码实现
生产者如何获取到没有被正确路由到合适队列的消息呢?

代码解读
上面代码中生产者没有成功地将消息路由到队列,此时 RabbitMQ 会通过Basic.Return 返回mandatory test这条消息,之后生产者客户端通过ReturnListener 监昕到了这个事 件,上面代码的最后输出应该是"Basic.Return 返回的结果是 mandatory test"。
2.当参数immediate设置为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么会将消息返回给生产者
注意:RabbitMQ3.0开始去掉了对immediate参数的支持
3.存储到备份交换器
说明
RabbitMQ 提供的备份交换器(Alternate Exchange) 可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定〉存储起来,而不用返回给客户端。
过期时间TTL
2种方式设置过期时间
1.通过队列属性设置过期时间,队列中的所有消息都有相同的过期时间
设置方法
原理
通过 channel.queueDeclare 方法中的 expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段 内也未调用过 Basic.Get 命令。
操作
用于表示过期时间的 x-expires 参数以毫秒为单位 井且服从和 x-message-ttl的约束条件,不过不能设置为0。比如该参数设置为1000 ,则表示该队列如果在1秒钟之内未使用则会被删除。
代码实现
Map<String , Object> args =new HashMap<String, Object>{) ; args.put( "x-expires" , 1800000); channel.queueDeclare("myqueue " , false , false , false , args) ;
2.设置每个消息的过期时间
设置方法
操作
在channel.queueDeclare 方法中加入x-message-ttl 参数实现的,这个参数的单位是毫秒。
代码实现
Map<String, Object> argss = new HashMap<String , Object>(); argss.put("x-message-ttl " , 6000); channel . queueDeclare(queueName , durable , exclusive , autoDelete , argss) ;
死信队列
概念
DLX ,全称为 Dead-Letter-Exchange ,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信 (dea message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
消息成为死信的几种情况
消息被拒绝 (Basic.Reject/Basic.Nack) ,井且设置 requeue 参数为false
消息过期
队列达到最大长度
为队列添加死信交换器DLX
操作
通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX
代码实现
channel . exchangeDeclare("dlx_exchange " , "direct "); // 创建 DLX: dlx_exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange" , " dlx_exchange "); //为队列 myqueue 添加 DLX channel . queueDeclare("myqueue" , false , false , false , args);
补充
操作
也可以为这个 DLX 指定路由键,通过参数x-dead-letter-routing-key来设置,如果没有特殊指定,则使用原队列的路由键
代码实现
args.put("x-dead-letter-routing-key" , "dlx-routing-key");
图解

延迟队列
概念
延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
常用场景
在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了。
用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
注意
在AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的死信交换器(DLX)和过期时间(TTL) 模拟出延迟队列的功能。
实现思想
首先生产者将消息发送到一个交换器上,交换器根据路由键将消息发送到设定了过期时间的队列上,该队列上再设置一个死信交换器DLX,死信交换器上会绑定一个队列(即死信队列);消费者只订阅死信队列,从而可以实现延迟队列的功能。
图解

优先级队列
概念
优先级队列 顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
队列的优先级
实现
思路
可以通过设置队列的 x-max-priority 参数来实现。
代码
Map<String, Object> args =new HashMap<String, Object>() ; args.put( "x-max-priority" , 10) ; channel.queueDeclare( "queue.priority" , true , fa1se , false , args) ;
web控制台
优先级队列在控制台上的标志

消息的优先级
实现
思路
需要在发送时在消息中设置消息当前的优先级。
代码
设置当前发送的消息的优先级为5
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder() ; builder.priority(5) ; AMQP.BasicProperties properties = builder .build() ; channel.basicPub1ish( "exchange_priority" , "rk_priority",properties,("messages").getBytes () ) ;
注意
需要先设置队列的优先级,之后才能设置消息 的优先级。
消息优先级默认最低为0,最高为队列设置的最大优先级。
优先级高的消息可以被优先消费的前提
如果在消费者的消费速度大于生产者的速度且Broker 中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
RPC实现
持久化
前言
"持久化"这个词汇在前面的篇幅中有多次提及,持久化可以提高 RabbitM 的可靠性,防在异常情况(重启、关闭、右机等)下的数据丢失 。本节针对这个概念做一个总结。RabbitMQ的持久化分为3个部分:交换器的持久化、队列的持久化和消息的持久化。
3部分的持久化
交换器持久化
实现思路
交换器的持久化是通过在声明队列时将 durable 参数置为 true 实现的
是否持久化有何影响
如果交换器不设置持久化,那么在 RabbitMQ务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。
队列持久化
实现思路
队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的
是否持久化有何影响
如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
扩展
队列的持久化仅能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。
消息持久化
实现思路
通过将消息的投递模式(BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化。
注意
设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。
可以将所有的消息都设直为持久化,但是这样会严重影响 RabbitMQ 的性能(随机)。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
思考
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的。
生产者确认(publisher confirm)
前言
在使用 RabbitMQ 的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器 ,何谈持久化?
解决生产者不知道消息是否正确到达服务器的2种机制
事务机制
rabbitmq客户端中与事务机制相关的3个方法
channel.txSelect
用于将当前信道设置成事务模式
channel.txCommit
用于提交事务
channel.txRollback
用于事务回滚
思想
在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。
代码
channel.txSelect(); channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,"transaction messages".getBytes()); channel.txCommit();
代码对应的AMQP协议流转过程

补充
如果要发送多条消息,则将 channel.basicPublish和channel.txCommit 等方法包裹进循环内即可
代码
channel.txSelect(); for (int i=0 ; i<LOOP_TIMES;i++) { try { channel.basicPublish ("exchange","routingKey",null,("messages"+ i).getBytes()); channel.txCommit(); ) catch (IOException e) { e.printStackTrace() ; channel.txRollback() ; } }
注意事项
消息重发
事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚 ,与此同时可以进行消息重发。
事务机制会"吸干" RabbitMQ 的性能
使用事务机制会"吸干" RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从 AMQP 协议层面来看并没有更好的办法,但是 RabbitMQ 提供了 一个改进方案,即发送方确认机制。
发送方确认机制
前言
前面介绍了RabbitMQ 可能会遇到的一个问题,即消息发送方(生产者)并不知道消息是否真正地到达了 RabbitMQ 。随后了解到在AMQP协议层面提供了事务机制来解决这个问题,但是采用事务机制实现会严重降低 RabbitMQ的消息吞吐量,这里就引入了一种轻量级的方式——发送方确认(publisher confirm) 机制。
实现思想
生产者将信道设置成 confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 会发送一个确认(Basic.Ack) 给生产者(包含消息的唯一 ID) ,这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外RabbitMQ可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都己经得到了处理,可以参考图 4-10 。注意辨别这里的确认和消费时候的确认之间的异同。
发送方确认机制图解

补充
生产者通过调用 channel.confirmSelect 方法(即Confirm.Select 命令)将信道设置为confirm 模式,之后RabbitMQ会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为confirm模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消息既被 ack 又被 nack 情况,并且RabbitMQ并没有对消息被comfirm的快慢做任何保证。
代码实现
try { channel.confirmSelect(); // 将信道置为 publisher confirm 模式 //之后正常发送消息 channel.basicPublish("exchange","routingKey",null,"publisher confirm test".getBytes()); if (!channel.waitForConfirms()) { System.out.println("send message failed") ; // do something else.... } } catch (InterruptedException e) { e.printStackTrace() ; }
解析与补充
如果发送多条消息,只需要将 channel.basicPublish和channel.waitForConfirms 方法包裹在循环里面即可,可以参考事务机制,不过不需要把channel.confirmSelect 方法包裹在循环内部。
问题
上述代码是每发布一条消息就同步等待broker返回ack或者nack,这样就类似于同步的操作了,这样的效率会很低
总结
事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。相比之下 发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该 nack命令。
改进提高性能
异步confirm方法
思想
提供一个回调方法,服务端确认了一条或多条消息后客户端会回调这个方法进行处理
操作步骤
1.执行channel.comfirmSelect方法将通道设置为生产者确认机制。 2.执行channel.addConfirmListener方法添加监听器,监听器中有2个方法,需要进行覆盖重写。 3.
代码
批量confirm方法
思想
每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回
操作步骤
代码
总结
事务机制与生产者确认机制是互斥的
事务机制和 publisher confirm机制确保的是消息能够正确地发送至 RabbitMQ ,这里的"发送至 RabbitMQ" 的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列 更进一步地讲,发送方要配合mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。
消费端要点介绍
前言
消费消息与消费端的确认与拒绝介绍了如何正确地消费消息。消费者客户端可以通过推模式或者拉模式的方式来获取井消费消息,当消费者处理完业务逻辑需要手动确认消息己被接收,这样 RabbitMQ才能把当前消息从队列中标记清除。当然如果消费者由于某些原因无法处理当前接收到的消息,可以通过 channel.basicNack 或者 channel.basicReject 来拒绝掉。
这里对于 RabbitMQ 消费端来说,还有几点需要注意:消息分发、消息顺序性、弃用QueueingConsumer
消息分发
概念
RabbitMQ 队列拥有多个消费者时 ,队列收到的消息将以轮询 (round-robin )的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
轮询的缺点
默认情况下,如果有n个消费者,那么 RabbitMQ会将第m条消息分发给第 m%n (取余的方式)个消费者, RabbitMQ 不管消费者是否消费并已经确认 (Basic.Ack) 了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
性能优化
解决轮询分发消息带来的应用吞吐量降低的问题
思路
这里就要用到 channel.basicQos(int prefetchCount) 这个方法,如前面章节所述, channel.basicQos 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。
用到的AMQP指令
Basic.Qos
应用步骤
举例说明,在订阅消费队列之前,消费端程序调用了 channel.basicQos(5) ,之后订 阅了某个队列进行消费。 RabbitMQ会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP/IP中的"滑动窗口"。
注意事项
Basic.Qos 的使用对于拉模式的消费方式无效.
消息顺序性
概念
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子不考虑消息重复的情况,如果生产者发布的消息分别为 msg1,msg2, msg3 ,那么消费者必然也是按照 msgl msg2 msg3 的顺序进行消费的。
拓展
什么情况下rabbitmq的消息是顺序性的
在不使用任何 RabbitMQ高级特性 ,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。
什么情况下rabbitmq的顺序性是不成立的
如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的 ,那么消息在生产者这个源头就出现了错序。
如果启用 publisher confirm 时,在发生超时、中断,又或者是收到 RabbitMQ Basic.Nack 命令时,那么同样需要补偿发送,结果与事务机制一样会错序。
如果生产者发送的消息设置了不同的超时时间,井且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。
如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的
如果1个队列按照前后顺序分有 msg1,msg2,msg3,msg4 这4个消息,同时有 ConsumerA、ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA 中的消息为 msg1、 msg3,ConsumerB 中的消息为 msg2、msg4, ConsumerA 收到消息 msg1之后并不想处理而调用了 Basic.Nack/.Reject 将消息拒绝,与此同时将requeue设置为 true ,这样这条消息就可以重新存入队列中。消息msg1之后被发送到了ConsumerB 中,此时ConsumerB已经消费了msg2、msg4 ,之后再消费 msg1,这样消息顺序性也就错乱了。或者消息 msg1 又重新发往 ConsumerA 中,此时 ConsumerA 已经消费了 msg3那么再消费 msg1 ,消息顺序性也无法得到保障。同样可以用在Basic.Recover 这个 AMQP命令中。
注意
rabbitmq的消息顺序性是有很大局限的,在很多复杂的情况下并不成立。
思考
包括但不仅限于以上几种情形会使 RabbitMQ 消息错序 如果要保证消息的顺序性,需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似 Sequence ID) 来实现。
弃用QueueingConsumer
消息传输保障
概念
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。
消息传输保障的3个层级
At most once: 最多一次。消息可能会丢失,但绝不会重复传输。
At least once: 最少一次。消息绝不会丢失,但可能会重复传输。
Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次。
补充
rabbitmq只支持“最多一次”和“最少一次”。
“最少一次”的消息传输保障需要考虑的问题
消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。
消费者在消费消息的同时需要将 autoAck 设置为 false ,然后通过手动确认的方式去确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失。
“最多一次”的消息传输保障
消息只会发送一次,不会出现重复发送;可能出现的问题是消息可能会丢失
配置
环境变量
RabbitMQ的环境变量都是以"RABBITMQ_"开头的,可以在shell环境中设置,也可以在 rabbitmq-env.conf这个RabbitMQ环境变量的定义文件中设置。如果在非shell环境中配置,则需要将“RABBITMQ_”这个前缀去除。优先级顺序按照Shell环境最优先,其次rabbit-env.conf配置文件,最后是默认的配置。
配置文件rabbitmq.config
位置: 如果不存在配置文件,那么可以手动创建它
参数及策略
RabbitMQ web页面
设置是否开启web页面
启用命令
rabbitmq-plugins enable rabbitmq_management
禁用命令
rabbitmq-plugins disable rabbitmq_management
web页面内容
队列

Java API
maven仓库
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
项目
springboot AMQP
springboot整合rabbitmq