导图社区 RabbitMQ
RabbitMQ的概念,工作原理,如何使用?FAQ内容点梳理。导图内容简洁、逻辑清晰、重点突出,尤其适用于需要考试的学生党/上班族哦,大家一起快乐的学起来吧
编辑于2022-07-08 12:29:59RabbitMQ
概念
AMQP协议
Advanced Message Queuing Protocol 高级消息队列协议应用层协议的一个开放标准,为面向消息的中间件设计
核心概念
Publisher
消息生产者,用于向交换器Exchange发送消息
Server/Broker
消息服务端: 1.从生产者接收消息 2.将消息从Exchange路由给Queue 3.Queue持久化并转发消息给消费者
Virtual host
虚拟主机,用于进行逻辑隔离,最上层的消息路由
类似命名空间
虚拟主机是共享相同的身份认证和加密环境的独立服务器域,每个虚拟主机本质上就是一个mini版的RabbitMQ服务器,拥有自己的一系列的Exchange、Queue、Bingding和权限机制,连接时若不指定则默认为/
Exchange
交换机,接收生产者发送的消息,根据路由键转发消息到绑定的队列
有3种常见交换机类型: direct:直连,通过路由键精确匹配Queue(单播) topic:主题,通过路由键规则模糊匹配Queue(组播) fanout:广播,不依赖路由键,转发至所有Queue(广播)
* 匹配一个单词 # 匹配0或多个字符 * # 只能写在.号左右 单词和单词之间用.隔开
可持久化到磁盘(默认不持久化)。持久化到磁盘会严重影响性能,需根据自己的实际情况选择是否需要持久化
不指定Exchange,则使用默认交换机,它是一个名称为空的直连交换机。每个队列都会使用QueueName作为BindingKey绑定到默认交换机,故所有类型的Exchange都可只传入QueueName和消息发送消息
Binding
绑定,Exchange和Queue之间的虚拟连接,Binding中可包含路由表。一个Binding就是基于RoutingKey将Exchange和Queue连接起来的路由规则,可将Exchange理解成一个由Binding构成的路由表
BindingKey
绑定键,用于把队列绑定到交换机上
RoutingKey
路由键,一条路由规则,Exchange根据RoutingKey进行消息投递
生产者发送的消息中带有RoutingKey字段,交换机接收到消息后,根据这个RoutingKey和当前交换机中所有绑定的绑定键进行匹配,匹配成功则往该BindingKey所绑定的队列发送消息,从而实现同一个生产者可将消息分发到不同的队列
真实情况下参数名都是RoutingKey,没有BindingKey这个参数,为了区别用户发送的和我们绑定的概念,我们才说RoutingKey和BindingKey
发送消息时,指定Exchange、RoutingKey和消息,消息就会被路由至对应的Queue
Queue
消息队列,消息的容器,保存消息并将它们转发给消费者,消息一直存在队列中直至被取走,一个消息可投递给一个或多个队列
可持久化到磁盘(默认不持久化)。持久化到磁盘会严重影响性能,需根据自己的实际情况选择是否需要持久化
Message
消息,包含消息头和消息体,消息头可设置消息的分布式ID、延迟消息等信息,消息体存放消息内容
Connection
连接,应用程序与Broker的TCP网络连接
ConnectionFactory
建立连接很耗资源,我们不能每次需要连接时都创建一个Connection,于是我们使用完后并不立即将其关闭,而是将其放入ConnectionFactory中缓存起来,后面再用连接时我们从该Factory中找出Idle的Connection来用,连接池的作用
Channel
网络信道,TCP中虚拟连接。几乎所有操作都在Channel中进行,如发布消息、接收消息、订阅队列、读写消息。客户端可建立多个Channel,每个Channel代表一个会话任务(类似数据库中Connection中的Session),一条TCP连接上可创建多条Channel
为什么需要Channel?为什么不直接使用TCP通信? 1.TCP的创建和销毁开销很大,创建需要3次握手,销毁需要4次挥手 2.如果不用Channel,应用程序就需要用TCP连接RabbitMQ,高峰时每秒成千上万个TCP连接会造成巨大的资源消耗,而且操作系统每秒处理TCP的连接数也有限制,会造成性能瓶颈 3.Channel的原理就是一个线程一个Channel,多个线程多个Channel可共用一个TCP连接。一个TCP连接可容纳无限的Channel,即使每秒成千上万的请求也不会成为性能瓶颈
我们用多个Connection与多个vhost连接,而用同一个Connection的多个Channel与多个Exchange连接
Consumer
消息消费者,用于从消息队列获取消息
RabbitMQ使用Erlang语法开发,它是一种面向并发的编程语言,有着和原生Socket一样的延迟,使得客户端与服务端之间可低延迟地进行数据交互
TTL队列死信队列
TTL队列
TTL:Time To Live,生存时间
发送消息时可指定消息过期时间,从消息入队列开始计时,若超过了TTL时间会被自动清除
死信队列
消息在Queue中变成死信(Dead Message)后,会被重新publish到DLX(Dead Letter Exchage)
消息变成死信的几种情况
消息被拒绝
basic.reject/basic.nack,且requeue=false
消息TTL过期
队列达到最大长度
应用场景
延迟消息
1.将消息发送到设置了TTL(过期时间=延迟消费的时间)的正常队列 2.消费端不消费该正常队列,而消费死信队列 3.该消息在正常队列中会TTL过期,然后会被转发到死信队列中 4.消费者消费死信队列消息,从而实现了延迟消费消息
高可用
若给正常队列配置了死信队列,消息消费不成功后会进入到死信队列而不至于被丢弃
工作原理
架构
完整流程
1.生产者通过Channel携带Exchange、RoutingKey和消息参数发送消息给Exchange
2.Exchange发送ACK给生产者,并根据RoutingKey将消息路由至对应的Queue
3.Queue将消息发送给消费者
4.消费者发送ACK确认信息给Queue
5.Queue收到ACK后删除该条消息
如何使用
maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置
spring: rabbitmq: addresses: 192.168.137.128:5672,192.168.137.129:5672 username: root password: root virtual-host: transaction publisher-confirms: true #开启发送端确认机制 publisher-returns: true # 开启returns template: # 交换机没有匹配的队列时,会将消息返回给生产者,避免消息丢失 mandatory: true listener: simple: acknowledge-mode: manual retry: enabled: true max-attempts: 5 initial-interval: 3000
代码
发送端
配置类
定义Exchange、Queue和Binding
@Configuration public class RabbitmqConfig { @Bean public TopicExchange topicExchangeSm(){ return new TopicExchange(Exchange_Topic); } @Bean public Queue queueTopicSm(){ return new Queue(Queue_Topic); } @Bean public Binding bindingTopicSm(Queue queueTopic, TopicExchange exchangeTopic){ return BindingBuilder.bind(queueTopic).to(exchangeTopic).with(KEY_Topic); } }
可定义多个Exchange、Queue、Binding
Exchange : Queue = M : N
定义监听器
实现ConfirmListener
可实现 ConfirmListener/RabbitTemplate.ConfirmCallback 接口监听,以处理ACK/NACK确认回调
实现ReturnListener
可实现ReturnListener/RabbitTemplate.ReturnCallback接口监听,以处理Return回调
API发送
@Autowired private AmqpTemplate rabbitTemplate; public void send(String message) { rabbitTemplate.convertAndSend("Exchange", "RoutingKey", message); }
需传入Exchange、RoutingKey和消息
对于Fanout类型,虽然RoutingKey用不着,但也要传,可传入""或null
可发送字符串,也可发送对象,若对象需实现Serializable接口
消费端
@Component public class MQListener { @RabbitListener(queues = Queue_Direct) public void receiveDirectMsg(BaseMessage message) { // ... } @RabbitListener(queues = Queue_Topic) public void receiveTopicMsg(BaseMessage message) { // ... } }
@RabbitListener可加在类上,也可加在方法上 若加在类上,则接收消息方法需添加@RabbitHandler
服务若是集群,对于非Fanout类型会负载均衡(轮询)地接收消息
FAQ
如何保证消息的可靠性
消息的可靠性是指发送了就一定能收到,即不丢消息
自身机制
生产端可靠性保障
Confirm Listener
Broker收到生产者投递的消息后,会给生产者返回一确认消息,生产者若未接收到该确认消息可再次发送
可确认消息到达了Exchange
Return Listener
Return Listener用于处理一些不可路由的消息: 发送消息后,若Exchange不存在或Routing Key路由不到Queue时,可使用Return Listener监听这种不可达消息并做相应处理
可确认消息到达了Queue
生产端限流
若Broker中积压了大量消息未被消费,会限制发送端的发送速率
Broker可靠性保障
Exchange/Queue/Message持久化到磁盘
默认情况下,exchange、queue、message 等数据都是存储在内存中的,这意味着如果 RabbitMQ 重启、关闭、宕机时所有的信息都将丢失,RabbitMQ 提供了持久化来解决这个问题。 RabbitMQ 持久化包含3个部分: exchange 持久化,在声明时指定 durable 为 true queue 持久化,在声明时指定 durable 为 true message 持久化,在投递时指定 delivery_mode=2(1是非持久化) queue 的持久化能保证本身的元数据不会因异常而丢失,但是不能保证内部的 message 不会丢失,要确保 message 不丢失,还需要将 message 也持久化 如果 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的; 如果 exchange 和 queue 两者之间一个持久化,一个非持久化,则不允许建立绑定 如果将所有的消息都进行持久化操作,这样会严重影响 RabbitMQ 的性能: 写入磁盘的速度比写入内存的速度要慢很多,需在可靠性和吞吐量之间权衡 将 exchange、queue 和 message 都进行持久化后,也不能保证消息一定不会丢失,消息存入RabbitMQ 后,还需要一段时间才会刷盘,RabbitMQ 并不会为每条消息都进行同步存盘,如果在这段时间服务器宕机或重启,消息还没来得及保存到磁盘中就会丢失。对于这种情况,可引入 RabiitMQ 镜像队列机制 Spring RabbitMQ 是对原生 RabbitMQ 客户端的封装。一般情况下,我们只需定义 exchange 和 queue 的持久化 exchange 和 queue 的持久化应该配置在消息发送端,其实也可配置在消息消费端,RabbitListener 会去检查 exchange 和 queue,如果不存在则创建
死信队列不丢消息
高可用部署
消费端可靠性保障
消费端消息确认
1.消费端接收到消息并处理完成后,反馈给Broker,Broker收到ACK反馈后才将此消息从Queue中删除,否则将消息重新放入队列 2.消费端若是集群,则Broker会立即将该消息推送给其它在线的消费者 3.该ACK确认机制默认是打开的,可手动也可自动ACK确认
消费端限流
RabbitMQ提供了QoS(服务质量保证)机制,即在非自动确认消息的前提下,一定数目(可基于Consumer/Channel设置)的消息未被确认前,不消费新的消息,以防止消费端被瞬间消息冲垮
外部机制
其自身机制无法解决消息在发送到 Exchange的过程中丢失的极端情况,故需要再通过外部机制来保证这一点
消息表状态
在高并发场景下,数据库IO压力大
延迟投递
1.业务数据库持久化,并发送first send消息 2.同时发送一个延迟的检查消息 (用于检查第一次发送的消息的消费情况) 3.消费端消费消息 4.消费端发送一个确认消息给Broker 5.回调服务接收到消费端的确认消息,进行消息库的状态持久化 6.回调服务响应第二个延时消息,查消息库后若未成功则通知生产者再次发送
延迟时间在业务允许的范围内要足够久
互联网大厂一般采用延迟投递,做二次检查、回调检查
如何避免重复消费消息
为保证消息的幂等性,需在消费端识别出若该消息已消费过则放弃再次消费该消息
幂等性方案
消息头中携带分布式ID
消费端接收到消息后,第一时间先将消息入库,若入库失败(触发唯一性约束或返回0)说明是重复消息则放弃本消息
数据库检查SQL应如下: INSERT INTO msg_log WHERE NOT EXISTS (SELECT 1 FROM msg_log WHERE msg_id=YYY) 或者给msg_id字段加唯一性约束
除入库外也可入Redis,使用setnx命令
返回0说明key已存在,为重复消息,则放弃本消息
消息顺序性
每条消息加版本号/时间戳,若接收到消息的版本号/时间戳小于消息日志表/Redis中的版本号/时间戳,说明是先发后到的消息,则丢弃该消息
若版本号则需保证全局递增,若时间戳则需保证所有发送者时钟同步
多种MQ对比
MQ作用
服务解耦、流量削峰、异步处理
push or pull
RabbitMQ默认从发送者到Broker和Broker到消费者都是push模式,但也支持pull模式
Push方式消息延迟低、吞吐高,但消费端处理能力若跟不上,消费端的缓冲区会溢出,Pull方式则相反
是否支持事务消息
支持,但其事务是用于确保消息不丢失的
模式
transaction模式
txSelect() txCommit() txRollback()
confirm模式
普通confirm
批量confirm
异步confirm
说明
我们一般用异步confirm模式,因为其它模式是同步的,效率低
集群部署
分布式模式(默认)
各节点只Exchange/Queue等元数据一致,但队列数据不一致,类似数据分片
以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象
高可用模式
队列数据会在各节点间进行同步,从而各节点数据保持一致
把需要的队列做成镜像队列,存在于多个节点上。它与分布式模式不同在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用
说明
两种模式中,HAProxy都是可选的