导图社区 RocketMQ
本导图详细介绍了Rocketmq的相关信息,包括消息存储、服务组与理论、回溯消息、生产者、消息过滤、消费者等。
编辑于2022-04-08 13:46:58RocketMQ
常见问题
autoCreateTopicEnable机制
前提条件 broker启动时会判断是否开启主题自动创建,若开启则会构建默认主题的路由信息(队列数默认8)并发送给NameServer 流程 消费者发送消息,主题topicA --->发现本地缓存中无对应topicA的路由信息 --->消费者客户端向NameServer获取topicA路由信息 --->NameServer发现没有topicA路由信息,因此返回空 --->消费者客户端使用默认主题再次查询路由信息 --->NameServer返回默认主题路由信息(1、broker若开启了自动创建,则启动时已经构建了默认主题的路由信息并已注册到NameServer),如果未找到默认主题路由信息则返回空 --->若默认主题返回空,则报空主题异常;不为空时,客户端选择默认主题的一个队列,并构造消息(包含默认主题、原主题、模式等) --->broker收到默认主题的消息后,会提取出原主题并构造该主题的路由信息 --->构建好的路由信息定时发送到NameServer --->客户端再次发送消息时就会拿到该主题的消息队列 特别注意:(先不考虑高并发场景下) 1、使用默认主题获取队列,此时只获取了一个队列,然后客户端向这个队列发送默认主题的消息,显然只有该队列所在的服务器能够创建原主题的消息队列,如果在第二次消息发送前被注册到了NameServer,那么以后发送消息都会发送到当前服务器,其他服务器就没有机会创建消息队列。 2、broker启动时默认主题默认创建8个消息队列,而客户端默认创建4个。通过默认主题创建消息队列时,broker默认值和客户端默认值取最小。 3、broker会定时把内存中的路由信息刷新到磁盘
ack卡进度
消费者(Consumer)
消费模式
集群模式(默认)
同一个消费组下,同一条消息只能被其中一个消费者消费
消费进度保存在Broker
广播模式
监听即可消费
消费进度保存在消费端
消息获取方式
推模式PUSH
正常理解,推模式就是消息队列将消息推送给消费者。但在RocketMQ中,推模式是在pull模式上包装了一层,一个拉取任务完成后再拉取下一个。
拉模式PULL
消息过滤模式
表达式(TAG/SQL92)
类过滤
重新分布机制
消息队列负载
一个消息队列通常只允许被一个消费者消费
一个消费者同时可以消费多个消息队列
消费方式
普通消费
顺序消费
只有当前消费成功,才消费下一条,没有RECONSUME_LATER
只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费
定时消息
重试机制
默认重试16次
消息消费失败,该消息本身会被ack,保证消费不被阻塞。同时被重新发回给broker,broker将其放在重试队列,走重试流程
达到重试次数后,放入死信队列
1 10秒 9 7分钟 2 30秒 10 8分钟 3 1分钟 11 9分钟 4 2分钟 12 10分钟 5 3分钟 13 20分钟 6 4分钟 14 30分钟 7 5分钟 15 1小时 8 6分钟 16 2小时
触发条件
Exception异常
一般重复16次 10s、30s、1mins、2mins、3mins等
超时
超时情况,MQ会无限制的发送消息给消费端
进度未提交
返回null
订阅重试消息
主题名:%RETRY%+消费组名
消费者启动的时候会自动订阅该主题
注意
消费确认机制
ack可能丢失,所以存在重复消费问题
特点
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接
定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接
并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费
当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉
线程模型
单Consumer实例+多worker线程模型
为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理
从哪儿消费
策略
CONSUME_FROM_LAST_OFFSET(默认)
从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET
从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
不一定从offset=0开始,因为历史消息只保留一部分
CONSUME_FROM_TIMESTAMP
从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
注意
以上策略只针对新加入的组,已经存在的组不生效
已存在的组会按实际消费进度消费
进度管理机制
group+queue为单位
一个group可能订阅多个topic、tag,但本质上是一个group对应多个queueId
问题
进度如何跟tag结合?
broker收到拉取请求,此时在一定范围内过滤出相应的消息(即拉取的一批消息下标不一定是连续的),消费完成后,ack最大下标即可
消费进度如何保存?
本地保存
定时任务同步到broker
Ack卡进度怎么办?
生产者(Producer)
单个JVM有个MQClientManager实例,维护map<String,MQClientInstance>,一个clientId只会创建一个MQClientInstance 其中 clientId=客户端IP+instance(+unitname可选),若instance为默认值,则会使用进程ID 同一个JVM不同的消费者、生产者在启动时获取到的MQClientInstance实例是同一个
消息发送流程
验证消息
主题不能为空
消息体不能为空
消息长度不能为0且最大不能超过4M
查找路由
查找
第一次发送消息时,本地没有路由缓存,消息发送之前需要先通过主题名称从NameServer获取该主题的路由信息,若未找到,则尝试用默认主题查询,如果autoCreateTopicEnable为true,则NameServer返回路由信息,否则抛出未找到路由的异常。 1、通过默认主题找的路由信息 替换路由信息中读写队列个数为消息生产者默认的队列个数(可配置) 2、通过topic找到路由信息 与本地缓存的路由信息对比并更新 注意: 1、从NameServer获取路由信息以后,会做本地缓存,第二次直接取缓存 2、客户端会产生一个定时任务,默认30秒从NameServer获取一次路由信息并更新本地缓存
选择
一次消息发送,可能会执行多次队列选择 第一次选择队列1,发送消息失败,此时会把该失败队列放到失败队列中,并重新选择队列,第二次选择时会规避第一次失败的队列
故障延迟机制
发送
1、为消息体生成全局唯一ID 2、消息体超过4K则启用zip压缩
从nameServer获取broker服务地址列表
消息发送
发送方式
同步
指:消息发送后,必须等待服务器响应,只有结果允许的情况下才可执行下一步 引入重试机制
异步
指:消息发送后无需等待服务器响应,只需提供一个回调方法,待服务器响应或者超时后的结果处理。 出现网络异常、超时等时不进行重试
单向
指:调用消息发送API后无需等待服务器返回发送结果,也无提供回调方法 无重试机制
批量发送
重试机制
异步发送
根据设定的次数进行重试
同步发送
不重试
如果发送的过程中超时,则直接抛出异常,不重试(有可能是重试的过程中)
特点
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接
如何切换
定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接
Group
消息存储
所有主题的消息均存储在一个文件上,以保证消息发送时顺序写文件。然而消息消费是基于主题,为了提高消费效率,引入了ConsumeQueue消息队列文件,每个消息队列都有自己的消息队列文件
存储文件
主要
消息文件
Comitlog
消息存储文件,将 位置: 每个文件默认1G(大小可修改),
所有主题的消息都存储在该文件中
只允许顺序写
即串行写入
根据偏移量和消息长度在文件中查找消息
默认是1GB
存满新建,以该文件的第一个偏移量为文件名
位置
${}/store/commitlog(位置可修改)
消息队列文件
ConsumeQueue
消息消费队列,消息到达comitlog后,将异步发送到该文件,供消费者消费,可以将该文件看做commitlog文件的索引 位置:${ROCKETMQ_HOME}/store/consumeQueue 每个主题一个子目录,每个主题有多个消息队列,所以该主题目录下又根据消息队列的ID依次创建了子目录,如topic-test/0 topic-test/1 存储结构:commitlog-offset:message-size:tag-hashcode ConsumeQueue并不全量存储消息,只存储对应消息在commitlog文件中的偏移量+消息长度+tag-hashcode 问题 1、整个消息全量异步发送吗?
Hash索引文件
IndexFile
IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法
为了加速消息的检索性能
检测点文件
checkPoint
关闭异常文件
abort
其他
checkPoint
消息文件过期机制
commitLog和ConsumeQueue公用一套过期删除机制
判断
如果非当前写文件在一定时间内没有再被更新,则被当作过期文件
默认72小时
mq默认10s判断一次
删除(满足任何一个)
到达时间点
默认凌晨4点
磁盘空间不足
预留的手工触发
特别注意
删除时不会判断文件中的消息是否已经被消费
消息文件存储空间报警机制
同步/异步双写
同步/异步复制
刷盘
同步
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
优点
数据可靠性很高
缺点
写入慢,并发低
异步
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
优点
写入很快,并发高
缺点
如果内存刷入磁盘前发生故障,数据容易丢失
消息过滤FilterServer
表达式
TAG
基于Topic+tag,比较好理解
SQL92
对消息的属性运用SQL过滤表达式进行条件匹配
类
主从同步(HA)机制
RocketMQ主从复制原理
RocketMQ读写分离机制
实战
消息批量发送
消息发送队列自选择
消息过滤
事务消息
Spring整合RocketMQ
Spring Cloud整合RocketMQ
RocketMQ监控与运维命令
应用场景分析
常见问题
面试
服务组成与理论
NameServer负责broker的注册,主题路由信息的收集等 broker每30秒向NameServer发送心跳包,包中包含broker所拥有的的路由信息,NameServer收集所有broker的路由信息并整理 消息生产者客户端每30秒向NameServer获取路由信息并更新本地缓存的路由,在发送消息前如果不存在该主题的路由则会主动从NameServer获取该主题路由信息
路由中心NameServer
默认端口 9876 Broker的注册,Broker的路由,Broker生命周期的管理 作用: 1、为生产者、消费者提供关于主题Topic的路由信息 2、Nameserver相当于broker的注册中心,在集群结构中,NameServer之间并不通信。 3、NameServer与所有Broker保持长链接,每隔10秒检测一次本地的Broker信息,如果连续120S没有收到Broker的心跳包,则将移除该broker的路由信息并关闭socket连接(Broker每30秒向NameServer发送心跳)
部署
Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。 部署时,可以启动多个,相互之间完全独立
配置
数据类型
路由信息
broker服务器信息
功能
路由管理
broker每30秒向NameServer发送心跳,包含路由信息
服务注册与发现
特点
nameserver之间不通信
与broker保持长链接,30S心跳
Name Server定时(每隔10s)扫描所有存活broker的连接
Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接
Broker
Broker正常关闭时会告诉NameServer
属性
brokerAddr
可以手动指定broker的ip地址,在向NameServer注册的时候如果配置了brokerAddr则注册该地址
brokerId
0:master 大于0:slave
brokerName
broker名称
clusterName
broker所在的集群名称,通过该名称区分broker所在的集群 每个broker在
haServerAddr
master主节点地址,初次请求该值为空,slave向NameServer注册后返回并赋值
启动流程
1、在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息 注意:此时默认主题队列数为8 2、Broker启动时向所有配置的NameServer发送心跳信息,每隔30秒向集群中所有的NameServer发送心跳包 3、心跳包包含 brokerId、名称、网络地址、集群名称、关联的FilterServer地址列表,broker所拥有的的路由信息(包括默认路由)
部署
master
BrokerId为0
slave
BrokerId非0
概要
通过指定相同的Broker Name关联
配置
客户端
路由发现
1、当NameServer的路由信息发生变化时,不会主动推给客户端,而是由客户端定期根据主题获取路由信息
扩容
主从同步机制(HA)
主服务器启动,并监听从服务器的连接 从服务器主动链接主服务器,并携带待拉取的偏移量Master与slave的HA心跳发送间隔是5s
同步主从
消息发送者将消息刷到磁盘以后,需要继续等待数据传输到从服务器
异步主从
数据到达主broker后并完成持久化后理解返回,然后异步同步到从broker
读写分离机制
消费者先向主服务器发起拉取消息请求,然后主服务器返回一批消息,同时会根据主服务器负载压力与主从同步情况,然后建议客户端下次拉取消息是从主服务器还是从服务器拉取
回溯消息
已消费消息可再次消费,支持按时间回溯消费
消息
类型
延时消息
延迟级别大于0,则将消息的原主题和原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC存储,到时间则将消息发送出去
事务消息
事务消息实现思想
两阶段提交
定时事务状态会查
决定提交还是回滚
事务消息发送流程
提交或回滚事务
事务消息回查事务状态
组成
msgId