导图社区 RockectMQ分布式消息中间件:核心原理与最佳实践
RockectMQ分布式消息中间件:核心原理与最佳实践,读书笔记整理,RocketMQ分布式消息中间件:核心原理与最佳实践. 李伟. 本书源码以RocketMQ 4.2.0和RocketMQ 4.3.0为基础,从RocketMQ的实际使用到RocketMQ的源码分析。
编辑于2022-09-10 16:27:35 江苏省RockectMQ分布式消息中间件: 核心原理与最佳实践
1. 综述

1.1. 什么是消息队列
本质是两个进常传递信息的一种方法,两个进程可以处于同一台机器,也可以分布在不同的机器
1.2. 为什么需要消息队列
削峰填谷
订票请求
解耦
多服务之间解耦
异步处理
订票30分钟内付款
数据最终一致性
跨行转账
1.3. 常见消息队列

ActiveMQ
Kafka
RocketMQ
Pulsar
1.4. 发展史与未来
前世
阿里巴巴 MetaQ
云化
参考Kafka设计
2016年上线云RocketMQ
毕业
2016年11月捐献给Apache
2017年9月25日,RocketMQ孵化成功
未来
金融级消息队列
亚毫秒级延迟,万亿级消息容量保证,高消息容错设计
借助OpenMessaging提供跨平台,多言的能力
打通Prometheus、ELK等上游组件
通过消息、Streaming等形式将数据流转到Flink、Elasticsearch、Hbase、Spark等下游组件
2. 生产者原理和最佳实践
2.1. 生产者原理
生产者概述
生产者组
一个逻辑概念,在使用生产者实例的时候需要指定一个组名
一个生产者组可以生产多个Topic的消息
生产者实例
生产者组部署了多个进程,每个进程都是一个生产者实例
Topic
主题,一个Topic由若干个Queue组成
实现类
org.apache.rocketmq.client.producer.DefaultMQProducer
用于生产普通消息、顺序消息、单项消息、批量消息、延迟消息
org.apache.rocketmq.client.producer.TransactionMQProducer
用于生产事务消息
消息结构和消息类型
消息结构

Topic:主题名字,可以通过RocketMQ Console创建。
Flag:目前没用。
Properties:消息扩展信息,Tag、keys、延迟级别都保存在这里。
Body:消息体,字节数组。需要注意生产者使用什么编码,消费者也必须使用相同编码解码,否则会产生乱码。
setKeys():设置消息的key,多个key可以用MessageConst.KEY_SEPARATOR(空格)分隔或者直接用另一个重载方法。如果 Broker 中 messageIndexEnable=true 则会根据 key创建消息的Hash索引,帮助用户进行快速查询。
setTags():消息过滤的标记,用户可以订阅某个Topic的某些Tag,这样Broker只会把订阅了topic-tag的消息发送给消费者。
setDelayTimeLevel():设置延迟级别,延迟多久消费者可以消费。
putUserProperty():如果还有其他扩展信息,可以存放在这里。内部是一个Map,重复调用会覆盖旧值。
消息类型
普通消息
普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序,但是生产消费都是并行进行的,单机性能可达十万级别的TPS。
分区有序消息
与Kafka中的分区类似,把一个Topic消息分为多个分区“保存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。
全局有序消息
如果把一个 Topic 的分区数设置为 1,那么该 Topic 中的消息就是单分区,所有消息都遵循FIFO(先进先出)的原则。
延迟消息
消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在 RocketMQ中只需要在发送消息时设置延迟级别即可实现。
事物消息
主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败时,消费者才能消费消息。RocketMQ通过发送Half消息、处理本地事务、提交(Commit)消息或者回滚(Rollback)消息优雅地实现分布式事务。
生产者高可用
客户端保证
第一种保证机制:重试机制。RocketMQ 支持同步、异步发送,不管哪种方式都可以在配置失败后重试,如果单个 Broker 发生故障,重试会选择其他 Broker 保证消息正常发送。
retryTimesWhenSendFailed:表示同步重试次数,默认为 2次,加上正常发送 1次,总共3次机会。
retryTimesWhenSendAsyncFailed:表示异步重试的次数,默认为 2 次,加上正常发送的1次,总共有3次发送机会。
第二种保证机制:客户端容错。RocketMQ Client会维护一个“Broker-发送延迟”关系,根据这个关系选择一个发送延迟级别较低的 Broker 来发送消息,这样能最大限度地利用 Broker 的能力,剔除已经宕机、不可用或者发送延迟级别较高的 Broker,尽量保证消息的正常发送。
sendLatencyFaultEnable:发送延迟容错开关,默认为关闭,如果开关打开了,会触发发送延迟容错机制来选择发送Queue。
发送Queue时如何选择呢?
第一步:获取一个在延迟上可以接受,并且和上次发送相同的Broker。首先获取一个自增序号index,通过取模获取Queue的位置下标 Pos。如果 Pos对应的 Broker的延迟时间是可以接受的,并且是第一次发送,或者和上次发送的Broker相同,则将Queue返回。
第二步:如果第一步没有选中一个Broker,则选择一个延迟较低的Broker。
第三步:如果第一、二步都没有选中一个Broker,则随机选择一个Broker。
Broker端保证
数据同步方式保证 Broker主从复制分为两种:同步复制和异步复制。
同步复制是指消息发送到Master Broker后,同步到Slave Broker才算发送成功;
异步复制是指消息发送到Master Broker,即为发送成功。
2Master、2Slave详细分析
(1)1个Slave掉电。Broker同步复制时,生产第一次发送失败,重试到另一组Broker后成功;Broker异步复制时,生产正常不受影响。
(2)2个 Slave掉电。Broker同步复制时,生产失败;Broker异步复制时,生产正常不受影响。
(3)1 个 Master 掉电。Broker 同步复制时,生产第一次失败,重试到另一组 Broker后成功;Broker异步复制时的做法与同步复制相同。
(4)2个Master掉电。全部生产失败。
(5)同一组Master和Slave掉电。Broker同步复制时,生产第一次发送失败,重试到另一组Broker后成功;Broker异步复制时,生产正常不受影响。
(6)2组机器都掉电:全部生产失败。
综上所述,想要做到绝对的高可靠,将 Broker 配置的主从同步进行复制即可,只要生产者收到消息保存成功的反馈,消息就肯定不会丢失。一般适用于金融领域的特殊场景。绝大部分场景都可以配置Broker主从异步复制,这样效率极高。
2.2. 生产者启动流程

2.3. 消息发送流程

3层
业务层:通常指直接调用RocketMQ Client发送API的业务代码。
消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作。
通信层:指RocketMQ基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通信全部使用该通信层。
总体上讲,消息发送流程首先是 RocketMQ 客户端接收业务层消息,然后通过DefaultMQProducerImpl发送一个RPC请求给Broker,再由Broker处理请求并保存消息。
3步
第一步:调用defaultMQProducerImpl.send()方法发送消息。
第二步:通过设置的发送超时时间,调用defaultMQProducerImpl.send()方法发送消息。设置的超时时间可以通过sendMsgTimeout进行变更,其默认值为3s。
第三步:执行defaultMQProducerImpl.sendDefaultImpl()方法。
第一步,两个检查:生产者状态、消息及消息内容。没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的Topic的名字是否为空或者是否符合规范;消息体大小是否符合要求,最大值为4MB,可以通过maxMessageSize进行设置。
第二步,执行tryToFindTopicPublishInfo()方法:获取Topic路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,就通过Namesrv获取路由信息,更新到本地,再返回。
第三步,计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。
第四步,执行队列选择方法selectOneMessageQueue()。根据队列对象中保存的上次发送消息的Broker的名字和Topic路由,选择(轮询)一个Queue将消息发送到Broker。我们可以通过sendLatencyFaultEnable 来设置是否总是发送到延迟级别较低的 Broker,默认值为False。
第五步,执行sendKernelImpl()方法。该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层,内部实现是基于Netty的,在封装为通信层request对象RemotingCommand前,会设置RequestCode表示当前请求是发送单个消息还是批量消息。
2.4. 最佳实践
发送普通消息

发送顺序消息

发送延迟消息

发送事务消息
   
发送单向消息

发送批量消息

2.5. 最佳实践总结
相对消费者而言,生产者的使用更加简单,一般读者主要关注消息类型、消息发送方法和发送参数,即可正常使用RocketMQ发送消息。
常用消息类型

常用的发送方法
 
常用参数

3. 消费流程和最佳实践
3.1. 消费者概述
消费流程

消费者组:一个逻辑概念,在使用消费者时需要指定一个组名。一个消费者组可以订阅多个Topic。
消费者实例:一个消费者组程序部署了多个进程,每个进程都可以称为一个消费者实例。
订阅关系:一个消费者组订阅一个 Topic 的某一个 Tag,这种记录被称为订阅关系。RocketMQ规定消费订阅关系(消费者组名-Topic-Tag)必须一致——在此,笔者想提醒读者,一定要重视这个问题,一个消费者组中的实例订阅的Topic和Tag必须完全一致,否则就是订阅关系不一致。订阅关系不一致会导致消费消息紊乱。
消费模式
集群模式(默认)
消费者组下的全部消费者实例共同消费Topic的全部消息。
集群模式的消费进度是保存在Broker端的,所以即使应用崩溃,消费进度也不会出错。
适用于异步通信、削峰等对消息没有顺序要求的场景。
广播模式
消费者组中的每个消费者实例都将消费整个 Topic 的全部消息。
消费进度保存在客户端机器的文件中。如果文件弄丢了,那么消费进度就丢失了,可能会导致部分消息没有消费。
比较适合各个消费者实例都需要通知的场景,比如刷新应用服务器中的缓存。
可靠消费
消费侧通过重试-死信机制、Rebalance机制等多种机制保证消费的可靠性。
重试-死信机制

正常Topic
正常消费者订阅的Topic名字。
重试Topic

消息消费失败会自动被保存到重试Topic中,格式为“%RETRY%消费者组”,在订阅的时候会自动订阅这个重试Topic。
进入重试队列的消息有16次重试机会,每次都会按照一定的时间间隔进行,即去掉前两个延迟等级的延迟间隔时间
死信Topic
死信Topic名字格式为“%DLQ%消费者组名”。
正常消费1次失败,重试16次失败,消息会被保存到死信Topic中,进入死信Topic的消息不能被再次消费。
Rebalance机制
用于在发生Broker掉线、Topic扩容和缩容、消费者扩容和缩容等变化时,自动感知并调整自身消费,以尽量减少甚至避免消息没有被消费。
3.2. 消费者启动机制
Pull消费(弃用)
实现类:org.apache.rocketmq.client.consumer.DefaultMQPullConsumer
defaultMQPullConsumerImpl.start()方法中的启动过程

第一步:最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JUST,然后设置消费者的默认启动状态为失败。
第二步:检查消费者的配置比,如消费者组名、消费类型、Queue分配策略等参数是否符合规范;将订阅关系数据发给Rebalance服务对象。
第三步:校验消费者实例名,如果是默认的名字,则更改为当前的程序进程id。
第四步:获取一个 MQClientInstance,如果 MQClientInstance 已经初始化,则直接返回已初始化的实例。这是核心对象,每个clientId缓存一个实例。
第五步:设置Rebalance对象消费者组、消费类型、Queue分配策略、MQClientInstance等参数。
第六步:对 Broker API 的封装类 pullAPIWrapper进行初始化,同时注册消息,过滤filter。
第七步:初始化位点管理器,并加载位点信息。位点管理器分为本地管理和远程管理两种,集群消费时消费位点保存在 Broker 中,由远程管理器管理;广播消费时位点存储在本地,由本地管理器管理。
通用流程
第八步:本地注册消费者实例,如果注册成功,则表示消费者启动成功。
第九步:启动MQClientInstance实例。
Push消费
实现类:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
启动流程

前七步同上
第八步:初始化消费服务并启动。之所以用户“感觉”消息是 Broker 主动推送给自己的,是因为DefaultMQPushConsumer通过Pull服务将消息拉取到本地,再通过Callback的 形 式,将本地消息Push给用户的消费代码。DefaultMQPushConsumer 与DefaultMQPullConsumer获取消息的方式一样,本质上都是拉取。
并行消费服务
实现类:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
顺序消费服务
实现类:org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService
DefaultMQPushConsumer根据用户监听器继承的不同接口初始化不同的消费服务程序
第九步:启动MQClientInstance实例。
第十步:更新本地订阅关系和路由信息;通过 Broker 检查是否支持消费者的过滤类型;向集群中的所有Broker发送消费者组的心跳信息。
第十一步:立即执行一次Rebalance
3.3. 消费者的Reblance机制
消费者下线的Rebalance过程

doRebalance过程(以Push方式为例)
第一步:查找当前 clientId对应的全部的消费者组,全部执行一次 Rebalance。
第二步:判断 Rebalance 开关,如果没有被暂停,则调用 RebalancePushImpl.rebalance()方法。
第三步:在RebalancePushImpl.rebalance()方法中,获取当前消费者全部订阅关系中的Topic,循环对每个Topic进行Rebalance。待全部的Rebalance都执行完后,将不属于当前消费者的队列删除。
第四步:Topic队列重新分配。以集群方式为例
(1)获取当前Topic的全部MessageQueue(代码中是mqSet)和该Topic的所有消费者的clientId(代码中是cidAll)。只有当两者都不为空时,才执行Rebalance。
(2)将全部的MessageQueue(代码中是mqAll)和消费者客户端(cidAll)进行排序。由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,保证所有消费者客户端在做 Rebalance 的时候,看到的 MessageQueue 列表和消费者客户端都是一样的视图,做Rebalance时才不会分配错。
(3)按照当前设置的队列分配策略执行 Queue 分配。
5种队列分配实现方式
AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
AllocateMessageQueueAveragelyByCircle:环形分配策略。
AllocateMessageQueueByConfig:手动配置。
AllocateMessageQueueConsistentHash:一致性Hash分配。
AllocateMessageQueueByMachineRoom:机房分配策略。
(4)动态更新ProcessQueue。在队列重新分配后,当前消费者消费的队列可能不会发生变化,也可能发生变化,不管是增加了新的队列需要消费,还是减少了队列,都需要执行updateProcessQueueTableInRebalance()方法来更新ProcessQueue。如果有MessageQueue不再分配给当前的消费者消费,则设置 ProcessQueue.setDropped(true),表示放弃当前MessageQueue 的 Pull 消息。
(5)执行messageQueueChanged()方法。如果有MessageQueue订阅发生变化,则更新本地订阅关系版本,修改本地消费者限流的一些参数,然后发送心跳,通知所有Broker,当前订阅关系发生了改变。
3.4. 消费进度保存机制
位点管理方式
远程位点管理
集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中
本地位点管理
广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中
位点持久化方式
定时持久化
定时持久化位点逻辑是通过定时任务来实现的,在启动程序10s后,会定时调用持久化方法MQClientInstance.this.persistAllConsumerOffset(),持久化每一个消费者消费的每一个MessageQueue的消费进度。
定时持久化位点实现方法是org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask()
不定时持久化
不定时持久化也叫 Pull-And-Commit,也就是在执行 Pull方法的同时,把队列最新消费位点信息发给 Broker
具体实现代码在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage()方法中。
第一处,在拉取完成后,如果拉取位点非法,则此时客户端会主动提交一次最新的消费位点信息给Broker,以便下次能使用正确的位点拉取消息
第二处,在执行消息拉取动作时,如果是集群消费,并且本地位点值大于0,那么把最新的位点上传给Broker
理论上位点信息越是及时上报 Broker,越能减少消息重复的可能性。RocketMQ在设计时并不完全支持Exactly-Once的语义,因为实现该语义的代价颇大,并且使用场景极少,再加上用户侧实现幂等的代价更小,故而RocketMQ在设计时将幂等操作交与用户处理。
3.5. 消费方式
Pull方式:用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求。在 RocketMQ 中 org.apache.rocketmq.client.consumer.DefaultMQPullConsumer是默认的Pull消费者实现类。Push 方式:代码接入非常简单,适合大部分业务场景。缺点是灵活度差,在了解其消费原理后,排查消费问题方可简单快捷。在 RocketMQ 中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类。 
Push消费

第一步:初始化 Push消费者实例。业务代码初始化 DefaultMQPushConsumer实例,启动Pull服务PullMessageService。该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中。
第二步:消费消息。由消费服务 ConsumeMessageConcurrentlyService (并发消费实现类)或者ConsumeMessageOrderlyService (顺序消费实现类)将本地缓存队列中的消息不断放入消费线程池,异步回调业务消费代码,此时业务代码可以消费消息。
 消费消息主要分为消费前预处理、消费回调、消费结果统计、消费结果处理4个步骤。
第一步:消费执行前进行预处理。执行消费前的 hook 和重试消息预处理。消费前的hook可以理解为消费前的消息预处理(比如消息格式校验)。如果拉取的消息来自重试队列,则将Topic名重置为原来的Topic名,而不用重试Topic名。
第二步:消费回调。首先设置消息开始消费时间为当前时间,再将消息列表转为不可修改的List,然后通过listener.consumeMessage(Collections.unmodifiableList(msgs),context)方法将消息传递给用户编写的业务消费代码进行处理。
第三步:消费结果统计和执行消费后的hook。客户端原生支持基本消费指标统计,比如消费耗时;消费后的 hook和消费前的 hook要一一对应,用户可以用消费后的 hook统计与自身业务相关的指标。
第四步:消费结果处理。包含消费指标统计、消费重试处理和消费位点处理。消费指标主要是对消费成功和失败的TPS的统计;消费重试处理主要将消费重试次数加1;消费位点处理主要根据消费结果更新消费位点记录。
第三步:保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,由消费进度管理服务定时和不定时地持久化到本地(LocalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中。对于消费失败的消息,RocketMQ客户端处理后发回给Broker,并告知消费失败。
Pull消费

第一步:fetchSubscribeMessageQueues(String Topic)。拉取全部可以消费的Queue。如果某一个Broker下线,这里也可以实时感知到。
第二步:遍历全部Queue,拉取每个Queue可以消费的消息。
第三步:如果拉取到消息,则执行用户编写的消费代码。
第四步:保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消费位点上报给Broker,也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度。
3.6. 消息过滤
RocketMQ 设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,Broker就不会传输给客户端,以免浪费宽带。
Tag过滤

第一步:用户发送一个带Ta g的消息。
第二步:用户订阅一个Topic的Tag,RocketMQ Broker会保存订阅关系。
第三步:在 Broker 端做 Tag 过滤。消费者在 Pull 消息时,RocketMQ Broker 会根据Tag的Hashcode进行对比。如果不满足条件,消息不会返回给消费者,以节约宽带。
第四步:客户端Tag过滤。Hash碰撞相信大家都有所了解,就是不同的Tag计算出来的 Hash 值可能是一样的,在这种情况下过滤后的消息是错误的,所以 RocketMQ 设计了客户端字符串对比功能,用来做第二次Ta g过滤。
Tag过滤为什么设计成 Broker端使用 Hash过滤,而客户端使用Tag字符串进行对比过滤呢?Broker端使用Hash过滤可以快速过滤海量消息,即使偶尔有“落网之鱼”,在客户端字符串过滤后也能被成功过滤。
SQL92过滤

第一步:消费者订阅Topic,上传过滤SQL语句,RocketMQ Broker编译SQL保存。
第二步:消费者Pull消息。
第一次过滤:使用Bloom过滤器的isHit()方法做第一次过滤。Bloom过滤器效率高,但是也存在缺陷,即只能判断不需要的消息,过滤后的消息也不保证都是需要消费的。
第二次过滤:执行编译后的SQL方法evaluate()即可过滤出最终的结果。
Filter Server过滤

第一步:用户消费者从Namesrv获取Topic路由信息,同时上传自定义的过滤器实现类源代码到FilterServer中,Filter Server编译并实例化过滤器类。
第二步:用户发送拉取消息请求到Filter Server,Filter Server通过pull consumer从Broker拉取消息,执行过滤类中的过滤方法,返回过滤后的消息。
3.7. 消费者最佳实践总结
排查不能消费问题

第一步:确认哪个消息未消费。
在这时消费者至少需要收集消息id、消息key、消息发送时间段三者之一。
第二步:确认消息是否发送成功。
可以通过消息id、消息key、消息时间段等任意一个条件在社区提供的 RocketMQ Console查找消息。如果查到消息,说明问题在消费者自身。此时消费者可以做如下检查,确认问题:
(1)订阅的Topic和发送消息的Topic是否一致,包含大小写一致。
(2)订阅关系是否一致。
(3)消费代码是否抛异常了,导致没有记录日志。
(4)消费者服务器和Namesrv或者Broker是否网络通畅。
第三步:如果在第二步中没有查到消息,说明生产者没有生产成功。
消息没有生产成功的问题可能是生产者自身的问题,也可能是 Namesrv 或者 Broker 问题导致消息发送失败。此时生产者可以做如下检查:
(1)确认生产者服务器与Namesrv或Broker网络是否通畅。
(2)检查生产者发送日志,确认生产者是否被流控。
(3)检查Broker日志,确认Broker是否繁忙。
(4)检查Broker日志,确认磁盘是否已满。
4. 架构和部署最佳实践
4.1. 架构

使用者:一般是指生产、消费程序的直接研发人员、RocketMQ中间件的维护人员等。
Console管理平台:管理RocketMQ生产者组、Topic、消费者组和RocketMQ元数据的平台。管理平台可以自研,也可以基于社区提供的 RocketMQ Console二次开发而来,或者直接使用社区提供的RocketMQ Console。
Namesrv集群:一个无状态的元数据管理,Namesrv之于RocketMQ等价于Zookeeper之于Kafka。
Broker 集群:消息中间商或消息代理。主要用于保存消息,处理生产者、消费者的各种请求的代理。包含Master和Slave两种角色,与MySQL中的主从角色类似。
生产者集群:消息发送方,通常由一个或多个生产者实例组成。
消费者集群:消息接收方,通常由一个或多个消费者实例组成。
4.2. 部署拓扑和部署实践
常用拓扑图
Namesrv部署
推荐一个集群并部署2~3个Namesrv节点。
Broker部署
第一种:单 Master。
“集群”中只有一个节点,宕机后不可用。通常用于个人入门学习,比如测试发送消息代码、测试消费消息代码等,建议在生产环境中不要使用这种部署方式。
第二种:单 Master,单 Salve。
单主从模式,Master 宕机后集群不可写入消息,但可以读取消息。通常用于个人深入学习,比如研究源码、设计原理等,建议在生产环境中不要使用这种部署方式。
第三种:多 Master,无 Salve。
该种部署方式性能最好,并且当单个 Master 节点宕机时,不影响正常使用。
第四种:多Master、多Slave,异步复制。
在第三种方式上增加了Slave,当一个Master节点宕机时,该Master不能写入消息,消费可以在其对应的Slave上进行。新消息的生产、消费不受影响。添加Salve后,消费者可以从对应的Slave中读取已发送到宕机Master中的消息。生产环境中可以使用这种部署方式。
第五种:多 Master、多 Slave,同步复制。
这种部署方式完全解决了第四种部署方式的弊端,虽然由于Master-Salve同步复制导致发送消息耗时增加,集群性能大大下降,但是这仍然是最可靠的部署方式。生产环境中可以使用这种部署方式。
同步复制、异步复制和同步刷盘、异步刷盘

复制是指Broker与Broker之间的数据同步方式。分为同步和异步两种,同步复制时,生产者会等待同步复制成功后,才返回生产者消息发送成功;异步复制时,消息写入Master Broker后即为写入成功,此时系统有较低的写入延迟和较大的系统吞吐量。
刷盘是指数据发送到Broker的内存(通常指PageCache)后,以何种方式持久化到磁盘。同步刷盘时,生产者会等待数据持久化到磁盘后,才返回生产者消息发送成功,可靠性极强;异步刷盘时,消息写入PageCache即为写入成功,到达一定量时自动触发刷盘。此时系统有非常低的写入延迟和非常大的系统吞吐量。
部署实践
1.Namesrv部署
第一步:修改Namesrv日志目录和Namesrv启动配置文件。
日志配置文件目录:./conf/logback_namesrv.xml。Namesrv启动配置文件目录:./conf/namesrv.conf。
第二步:启动Namesrv,启动命令如下。

第三步:验证启动结果。
查看 Namesrv的日志文件 namesrv.log的内容,如果内容包含The Name Server bootsuccess.serializeType=XXX,则说明启动成功。
2.Master Broker部署
第一步:修改日志配置文件,保存目录和启动配置文件。
日志配置文件路径:./conf/logback_broker.xml。
启动配置文件路径:./conf/broker.conf。
第二步:启动master

第三步:验证master启动结果。
在Broker日志目录下会生成12个日志文件
broker_default.log、broker.log、commercial.log、filter.log、lock.log、protection.log、remoting.log、stats.log、storeerror.log、store.log、transaction.log、watermark.log。
3.Slave Broker部署
第一步:修改日志配置文件,保存目录和启动配置文件,修改启动配置文件中的两个配置项为:brokerId=1,brokerRole=SLAVE,其他配置项建议和Master Broker保持一致。
第二步、第三步:与Master Broker完全一致。
4.部署社区版RocketMQ Console管理平台
第一步:启动配置文件修改。
RocketMQ Console是一个Springboot项目,其配置文件是application.properties,具体修改配置项如下:

第二步:RocketMQ Console启动命令如下:

第三步:查看启动结果。
5. Namesrv
5.1. Namesrv概述
什么是Namesrv
Namesrv是专门针对RocketMQ开发的轻量级协调者,多个Namesrv节点可以组成一个Namesrv集群,帮助RocketMQ集群达到高可用。
Namesrv的主要功能是临时保存、管理Topic路由信息,各个Namesrv节点是无状态的,即每两个Namesrv节点之间不通信,互相不知道彼此的存在。在Broker、生产者、消费者启动时,轮询全部配置的 Namesrv 节点,拉取路由信息。
Namesrv核心数据结构和API
Namesrv中保存的数据被称为Topic路由信息,Topic路由决定了Topic消息发送到哪些Broker,消费者从哪些Broker消费消息。
路由数据结构的实现代码都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager类中
Namesrv和Zookeeper

5.2. Namesrv架构

Namesrv包含 4个功能模块:Topic路由管理模块、Remoting通信模块、定时任务模块、KV管理模块

Topic路由管理模块:Topic路由决定Topic的分区数据会保存在哪些Broker上。这是Namesrv最核心的模块,Broker启动时将自身信息注册到Namesrv中,方便生产者和消费者获取。生产者、消费者启动和间隔的心跳时间会获取Topic最新的路由信息,以此发送或者接收消息。
Remoting通信模块:是基于Netty的一个网络通信封装,整个RocketMQ的公共模块在RocketMQ各个组件之间担任通信任务。该组件以Request/Response的方式通信,比如你想知道你使用的RocketMQ 支持哪些功能,可以查看org.apache.rocketmq.common.protocol.RequestCode.java,一个RequestCode代表一种功能或者一个接口。
定时任务模块:其实在 Namesrv 中定时任务并没有独立成一个模块,而是由org.apache.rocketmq.namesrv.NamesrvController.initialize()中调用的几个定时任务组成的,其中包括定时扫描宕机的Broker、定时打印KV配置、定时扫描超时请求。
KV管理模块:Namesrv维护一个全局的KV配置模块,方便全局配置,从4.2.0的源代码看,没有发现被使用。
Namesrv启动流程

第一步:脚本和启动参数配置。
启动命令:nohup./bin/mqnamesrv-c./conf/namesrv.conf>/dev/null 2>&1&。通过脚本配置启动基本参数,比如配置文件路径、JVM参数。调用NamesrvStartup.main()方法,解析命令行的参数,将处理好的参数转化为Java实例,传递给NamesrvController实例。
第二步:new 一个 NamesrvController,加载命令行传递的配置参数,调用controller.initialize()方法初始化 NamesrvController。
加载KV配置。主要是从本地文件中加载KV配置到内存中。
初始化Netty通信层实例。RocketMQ基于Netty实现了一个RPC服务端,即NettyRemotingServer。通过参数nettyServerConfig,会启动9876端口监听。
初始化Netty通信层实例。RocketMQ基于Netty实现了一个RPC服务端,即NettyRemotingServer。通过参数nettyServerConfig,会启动9876端口监听。
Namesrv定时打印配置信息到日志中。
第三步:NamesrvController在初始化后添加JVM Hook。Hook中会调用NamesrvController.shutdown()方法来关闭整个Namesrv服务。
第四步:调用NamesrvController.start()方法,启动整个Namesrv。其实start()方法只启动了Namesrv接口处理线程池。
Namesrv停止流程
通常Namesrv的停止是通过关闭命令./mqshutdown namesrv来实现的。
JVM的关机Hook调用关闭了controller,controller.shutdown()方法
关闭Netty服务端,主要是关闭Netty事件处理器、时间监听器等全部已经初始化的组件。
关闭Namesrv接口处理线程池。
关闭全部已经启动的定时任务。
5.3. RocketMQ路由原理
路由注册
Namesrv获取的 Topic路由信息来自 Broker定时心跳,心跳时 Brocker将 Topic信息和其他信息发送到Namesrv。Namesrv通过RequestCode.REGISTER_BROKER接口将心跳中的Broker信息和Topic信息存储在Namesrv中。
在路由信息注册完成后,Broker 会每隔 30s 发送一个注册请求给集群中全部的Namesrv,俗称心跳信,会把最新的 Topic 路由信息注册到 Namesrv 中。
路由剔除
如果Broker长久没有心跳或者宕机,那么Namesrv会将这些不提供服务的Broker剔除。同时生产者和消费者在与 Namesrv 心跳后也会感知被踢掉的 Broker,如此 Broker 扩容或者宕机对生产、消费无感知的情况就处理完了。
如果 Broker 心跳的最后更新时间超过 BROKER_CHANNEL_EXPIRED_TIME(1000×60×2=120s),则将Broker剔除。
两种方式
第一种:Broker 主动关闭时,会调用 Namesrv 的取消注册 Broker 的接口RequestCode=RequestCode.UNREGISTER_BROKER,将自身从集群中删除。
第二种:Namesrv 通过定时扫描已经下线的 Broker,将其主动剔除,实现过程在org.apache.rocketmq.namesrv.NamesrvController.initialize()方法中
6. Broker存储机制

6.1. Broker概述
什么是Broker
Broker是RocketMQ中核心的模块之一,主要负责处理各种TCP请求(计算)和存储消息(存储)
Broker分为Master和Slave。Master主要提供服务,Slave在Master宕机后提供消费服务。
Broker存储目录结构

Commitlog:这是一个目录,其中包含具体的commitlog文件。文件名长度为20个字符,文件名由该文件保存消息的最大物理offset值在高位补0组成。每个文件大小一般是1GB,可以通过mapedFileSizeCommitLog进行配置。
consumequeue:这是一个目录,包含该 Broker 上所有的 Topic 对应的消费队列文件信息。消费队列文件的格式为“./consumequeue/Topic名字/queue id/具体消费队列文件”。每个消费队列其实是commitlog的一个索引,提供给消费者做拉取消息、更新位点使用。
Index:这是一个目录,全部的文件都是按照消息key创建的Hash索引。文件名是用创建时的时间戳命名的。
Config:这是一个目录,保存了当前Broker中全部的Topic、订阅关系和消费进度。这些数据Broker会定时从内存持久化到磁盘,以便宕机后恢复。
abort:Broker 是否异常关闭的标志。正常关闭时该文件会被删除,异常关闭时则不会。当Broker重新启动时,根据是否异常宕机决定是否需要重新构建Index索引等操作。
checkpoint:Broker最近一次正常运行时的状态,比如最后一次正常刷盘的时间、最后一次正确索引的时间等。
Broker启动和停止流程
启动流程

第一步:初始化启动环境。
这是由./bin/mqbroker 和./bin/runbroker.sh 两个脚本来完成的。/bin/mqbroker 脚本主要用于设置 RocketMQ 根目录环境变量,调用./bin/runbroker.sh 进入 RocketMQ 的启动入口
第二步:初始化BrokerController。
初始化主要包含RocketMQ启动命令行参数解析、Broker各个模块配置参数解析、Broker各个模块初始化、进程关机Hook初始化等过程
第三步:启动RocketMQ的各个组件。
组件启动代码在 org.apache.rocketmq.broker.BrokerController.start()方法中
关闭流程

Broker 关闭只是调用 BrokerStartup.java 中注册 JVM Hook 的 BrokerController.shutdown()方法,该方法再调用各个模块关闭方法,最后关闭整个进程。Broker 进程关闭处理完成后,日志输出info信息“Shutdown hook over”。
6.2. Borker存储机制
Broker消息存储结构
org.apache.rocketmq.store.CommitLog类负责处理全部消息的存储逻辑——普通消息、定时消息、顺序消息、未消费的消息和已消费的消息。

CommitLog目录下有多个CommitLog文件。其实CommitLog只有一个文件,为了方便保存和读写被切分为多个子文件,所有的子文件通过其保存的第一个和最后一个消息的物理位点进行连接

Broker按照时间和物理的offset顺序写CommitLog文件,每次写的时候需要加锁,每个 CommitLog 子文件的大小默认是 1GB(1024×1024×1024B),可以通过mapedFileSizeCommitLog进行配置。当一个CommitLog写满后,创建一个新的CommitLog,继续上一个ComiitLog的Offset写操作,直到写满换成下一个文件。所有CommitLog子文件之间的Offset是连续的,所以最后一个CommitLog总是被写入的。
为什么写文件这么快
Page Cache:现代操作系统内核被设计为按照Page读取文件,每个Page默认为4KB。因为程序一般符合局部性原理,所以操作系统在读取一段文件内容时,会将该段内容和附件的文件内容都读取到内核Page中(预读),下次读取的内容如果命中Page Cache就可以直接返回内容,不用再次读取磁盘

Page Cache机制也不是完全无缺点的,当遇到操作系统进行脏页回写、内存回收、内存交换等情况时,就会引起较大的消息读写延迟。对于这些情况,RocketMQ 采用了多种优化技术,比如内存预分配、文件预热、mlock系统调用等,以保证在最大限度地发挥Page Cache 机制的优点的同时,尽可能地减少消息读写延迟。所以在生产环境部署 RocketMq的时候,尽量采用SSD独享磁盘,这样可以最大限度地保证读写性能。
Virtual Memory(虚拟内存):为了保证每个程序有足够的运行空间和编程空间,可以将一些暂时不用的内存数据保存到交换区(其实是磁盘)中,这样就可以运行更多的程序,这种“内存”被称为虚拟内存(因为不是真的内存)。操作系统的可分配内存大小=虚拟内存大小+物理内存大小。
零拷贝和 Java 文件映射:从文件读取流程可以看到,读取到内核态的数据会经历两次拷贝,第一次从内核态内存拷贝到用户态内存,第二次从用户态内存拷贝到 Java 进程的某个变量地址,这样Java变量才能读取数据

java.nio.MappedByteBuffer.java文件中实现了零拷贝技术,即Java进程映射到内核态内存,原来内核态内存与用户态内存的互相拷贝过程就消失了。
Broker消息存储机制
消息存储流程

(1)Broker接收客户端发送消息的请求并做预处理。
(2)Broker存储前预处理消息。
(3)执行DefaultMessageStore.putMessage()方法进行消息校验和存储模块检查。
● 校验存储模块是否已经关闭。
● 校验Broker是否是Slave。
● 校验存储模块运行标记。
● 校验Topic长度。
● 校验扩展信息的长度。
● 校验操作系统Page Cache是否繁忙。
(4)执行 org.apache.rocketmq.store.CommitLog.putMessage()方法,将消息写入CommitLog。
● 设置消息保存时间为当前时间戳,设置消息完整性校验码CRC(循环冗余码)。
● 延迟消息处理。如果发送的消息是延迟消息,这里会单独设置延迟消息的数据字段,比如修改Topic为延迟消息特有的Topic——SCHEDULE_TOPIC_XXXX,并且备份原来的Topic和queueId,以便延迟消息在投递后被消费者消费。
● 获取最后一个 CommitLog 文件实例 MappedFile,锁住该 MappedFile。默认为自旋锁,也可以通过 useReentrantLockWhenPutMessage 进行配置、修改和使用ReentrantLock。
● 校验最后一个 MappedFile,如果结果为空或已写满,则新创建一个 MappedFile返回。
● 调用 MappedFile.appendMessage(final MessageExtBrokerInner msg,final AppendMessageCallback cb),将消息写入MappedFile。
根据消息是单个消息还是批量消息来调用 AppendMessageCallback.doAppend()方法,并将消息写入Page Cache
(1)查找即将写入的消息物理Offset。
(2)事务消息单独处理。这里主要处理Prepared类型和Rollback类型的消息,设置消息queueOffset为0。
(3)序列化消息,并将序列化结果保存到ByteBuffer中(文件内存映射的Page Cache或 DirectMemory,简称 DM)。特别地,如果将刷盘设置为异步刷盘,那么当ransientStorePoolEnable=true时,会先写入DM,DM中的数据再异步写入文件内存映射的PageCache中。因为消费者始终是从Page Cache中读取消息消费的,所以这个机制也称为“读写分离”。
(4)更新消息所在Queue的位点
内存映射机制与高效写磁盘

RocketMQ在存储设计中通过内存映射、顺序写文件等方式实现了高吞吐
文件刷盘机制
RocketMQ支持两种刷盘方式,在Broker启动时配置flushDiskType=SYNC_FLUSH表示同步刷盘,配置flushDiskType=ASYNC_FLUSH表示异步刷盘。
同步刷盘

GroupCommitService就是org.apache.rocketmq.store.CommitLog.GroupCommitService——同步刷盘服务。
过程

如何将刷盘结果通知存储数据线程的?

异步刷盘
配置读写分离(异步转存服务)

CommitRealTimeService 就是org.apache.rocketmq.store.CommitLog.CommitRealTimeService——异步转存服务。Broker通过配置读写分离将消息写入直接内存(Direct Memory,简称 DM),然后通过异步转存服务,将DM 中的数据再次存储到 Page Cache中,以供异步刷盘服务将Page Cache刷到磁盘中
过程

在异步转存服务和存储服务把消息写入Page Cache后,由异步刷盘服务将消息刷入磁盘中

未配置读写分离(没有异步转存)

FlushRealTimeService 就是org.apache.rocketmq.store.CommitLog.FlushRealTimeService——异步刷盘服务。
读写分离机制
Broker Master-Slave读写分离:写消息到Master Broker,从Slave Broker读取消息。Broker 配置为 slaveReadEnable=True (默认 False),消息占用内存百分比配置为accessMessageInMemoryMaxRatio=40(默认)。

Broker Direct Memory-Page Cache读写分离:写消息到Direct Memory(直接内存,简称DM),从操作系统的 Page Cache 中读取消息。Master Broker 配置读写分离开关为transientStorePoolEnable=True(默认False),写入DM存储数量,配置transientStorePoolSize至少大于 0(默认为 5,建议不修改),刷盘类型配置为flushDiskType=FlushDiskType.ASYNC_FLUSH,即异步刷盘。

6.3. CommitLog索引机制
索引的数据结构
Consumer Queue

消费队列,主要用于消费拉取消息、更新消费位点等所用的索引。源代码可以参考文件org.apache.rocketmq.store.ConsumeQueu
Index File

是一个RocketMQ实现的Hash索引,主要在用户用消息key查询时使用,该索引是通过org.apache.rocketmq.store.index.IndexFile类实现的。
每个Index File文件包含文件头、Hash槽位、索引数据。每个文件的Hash槽位个数、索引数据个数都是固定的。Hash 槽位可以通过 Broker 启动参数 maxHashSlotNum进行配置,默认值为500万;索引数据可以通过Broker启动参数maxIndexNum进行配置,默认值为500万×4=2000万,一个Index File约为400MB。

索引的创建过程
Consume Queue和Index File两个索引都是由org.apache.rocketmq.store.ReputMessage Service类创建的

步骤

第一步:从CommitLog中查找未创建索引的消息,将消息组装成DispatchRequest对象,该逻辑主要在org.apache.rocketmq.store.CommitLog.checkMessageAndReturnSize()方法中实现。
第二步:调用 doDispatch()方法,该方法会循环多个索引处理器(这里初始化了CommitLogDispatcherBuildConsumeQueue 和 CommitLogDispatcherBuildIndex 两个索引处理器)并调用索引处理器的dispatch()方法来处理DispatchRequest。
索引创建失败怎么办
Consume Queue和Index File每次刷盘时都会做Checkpoint操作,Broker每次重启的时候可以根据Checkpoint信息得知哪些消息还未创建索引。
索引如何使用
按照位点查消息

第一步:拉取前校验。校验DefaultMessageStore服务是否已经关闭(正常关闭进程时会被关闭),校验DefaultMessageStore服务是否可读。
第二步:根据 Topic 和 queueId 查找 ConsumeQueue 索引映射文件。判断根据查找到的ConsumeQueue索引文件校验传入的待查询的位点值是否合理,如果不合理,重新计算下一次可以拉取的位点值。
第三步:循环查询满足maxMsgNums条数的消息。循环从ConsumeQueue中读取消息物理位点、消息大小和消息Tag的Hash值。先做Hash过滤,再使用过滤后的消息物理位点到CommitLog中查找消息体,并放入结果列表中。
第四步:监控指标统计,返回拉取的消息结果。
按照时间段查消息
第一步:查找这个Topic 的所有Queue。
第二步:在每一个队列中查找起始时间、结束时间对应的起始 offset 和最后消息的offset。
第三步:根据起始位点、最后消息位点和 Topic,循环拉取所有 Queue就可以拉取到消息。
按照key查询消息
如果通过设置messageIndexEnable=True(默认是True)来开启Index索引服务,那么在写入消息时会根据 key自动构建 Index File索引。用户可以通过Topic和key查询消息,查询方法为org.apache.rocketmq.store.ConsumeQueue.queryMessage(String Topic,String key,intmaxNum,long begin,long end)。queryMessage()方法的查询过程与按照位点查询消息的过程类似
第一步:调用 indexService.queryOffset()方法,通过 Topic、key 查找目标消息的物理位点信息。
第二步:根据物理位点信息在CommitLog中循环查找消息体内容。
第三步:返回查询结果。
6.4. 过期文件删除机制
RocketMQ 通过设置数据过期时间来删除额外的数据文件,具体的实现逻辑是通过org.apache.rocketmq.store.DefaultMessageStore.start()方法启动的周期性执行方法DefaultMessageStore.this.cleanFilesPeriodically()来实现的。
CommitLog文件的删除过程
CommitLog文件由org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService类提供的一个线程服务周期执行删除操作
删除操作执行条件
第一,当前时间等于已经配置的删除时间。
第二,磁盘使用空间超过85%。
第三,手动执行删除(开源版本RocketMQ 4.2.0不支持)。
Consume Queue、Index File文件的删除过程
Consume Queue和Index File都是索引文件,在CommitLog文件被删除后,对应的索引文件其实没有存在的意义,并且占用磁盘空间,所以这些文件应该被删除。RocketMQ的删除策略是定时检查,满足删除条件时会删除过期或者无意义的文件。
通过查看代码我们得知,最终程序调用CleanConsumeQueueService.deleteExpiredFiles()方法来删除索引文件
对于Index File索引文件,则是通过调用indexService.deleteExpiredFile()方法进行删除的
6.5. 主从同步机制
Broker主从同步数据有两种:配置数据和消息数据。配置数据主要包含Topic配置、消费者位点信息、延迟消息位点信息、订阅关系配置等。Broker 主从同步的逻辑是通过 org.apache.rocketmq.broker.slave.SlaveSynchronize.syncAll()方法实现的。该方法在org.apache.rocketmq.broker.BrokerController.initialize()方法中被初始化,每60s同步一次,并且同步周期不能修改。消息数据是生产者发送的消息,保存在 CommitLog 中,由 HAService 服务实时同步到SlaveBroker中。所有实现类都在org.apache.rocketmq.store.ha 包下。 
配置数据同步流程
配置数据包含 4 种类型:Topic 配置、消费者位点、延迟位点、订阅关系配置。 
第一步:Master Broker在启动时,初始化一个BrokerOuterAPI,这个服务的功能包含Broker注册到Namesrv、Broker从Namesrv解绑、获取Topic配置信息、获取消费者位点信息、获取延迟位点信息及订阅关系等。
第二步:Slave Broker在初始化Controller的定时任务时,会初始化SlaveSynchronize服务,每60s调用一次SlaveSynchronize.syncAll()方法。
第三步:syncAll()方法依次调用4种配置数据(Topic配置、消费者位点、延迟位点、订阅关系配置)的同步方法同步全量数据
第四步:syncAll()中执行的4个方法都通过Remoting模块同步调用BrokerOuterAPI,并从Master Broker获取数据,保存到Slave中。
第五步:Topic 配置和订阅关系配置随着保存内存信息的同时持久化到磁盘上;消费者位点通过BrokerController 初始化定时任务持久化到磁盘上;延迟位点信息通过ScheduleMessageService定时将内存持久化到磁盘上
CommitLog数据同步流程
CommitLog 的数据同步分为同步复制和异步复制两种。同步复制是生产者生产消息后,等待MasterBroker将数据同步到Slave Broker后,再返回生产者数据存储状态;异步复制是生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果 
(1)异步复制。Master Broker启动时会启动HAService.AcceptSocketService服务,当监听到来自 Slave 的注册请求时会创建一个 HAConnection,同时 HAConnection 会创建ReadSocketService和WriteSocketService两个服务并启动,开始主从数据同步。ReadSocketService接收Slave同步数据请求,并将这些信息保存在HAConnection中。WriteSocketService根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave。
(2)同步复制。在 CommitLog 将消息存储到 Page Cache 后,会调用 CommitLog.handleHA()方法处理同步复制
6.6. 关机恢复机制
关机恢复概述
Broker关机恢复是指恢复CommitLog、Consume Queue、Index File等数据文件。Broker关机分为正常调用命令关机和异常被迫进程终止关机两种情况。恢复过程的设计目标是使正常停止的进程实现零数据丢失,异常停止的进程实现最少量的数据丢失。与关机恢复相关的主要文件有两个:abort和checkpoint。
abort是一个空文件,标记当前Broker是否正常关机,Broker进程正常启动的时候,创建该文件。Broker进程正常停止后,该文件会被删除;如果异常退出,则文件依旧存在

checkpoint是检查点文件,保存Broker最后正常存储各种数据的时间,在重启Broker时,恢复程序知道从什么时刻恢复数据。检查点逻辑由 org.apache.rocketmq.store.StoreCheckpoint类实现。

关机恢复流程

第一步:Broker异常退出检查。如果abort文件存在,说明上次是异常退出的。
第二步:加载延迟消息的位点信息。ScheduleMessageService 服务通过继承和重写ConfigManager,调用 load()方法从磁盘加载延迟位点文件的内容,并根据配置项messageDelayLevel初始化延迟级别。
第三步:加载全部CommitLog文件。通过读取CommitLog目录下的所有文件,依次加载每个 CommitLog 为 MappedFile,并且设置写指针、已刷盘指针、已提交指针,使所有指针都指向该文件的最末位。
第四步:加载全部 Consume Queue 文件及数据。调用loadConsumeQueue方法,读取./consumequeue/Topic/queueId/目录,加载全部Topic、queueId作为ConsumeQueue对象,再调用load方法初始化每一个ConsumeQueue。
第五步:初始化 Checkpoint 文件为 StoreCheckpoint 对象,并且初始化三个数据:physicMsgTimestamp、logicsMsgTimestamp和indexMsgTimestamp。
第六步:加载Index File索引。加载./index目录下的全部索引文件,如果上次进程异常退出并且索引文件操作的最后时间戳大于 Checkpoint 中保存的时间,则说明当前文件有部分数据可能存在错误,须立即销毁文件。
第七步:恢复全部数据。
7. 事务消息与延迟消息机制
7.1. 事务消息概述
事务消息的实现方案目前主要分为两种:两阶段提交方案和三阶段提交方案。RocketMQ 采取了两阶段提交的方案进行实现。
分布式事务是指在多个系统或多个数据库中的多个操作要么全部成功,要么全部失败,并且需要满足ACID四个特性。
7.2. 事务消息机制
将事务消息的发送和处理总结为四个过程:生产者发送事务消息和执行本地事务、Broker存储事务消息、Broker回查事务消息、Broker提交或回滚事务消息。
生产者发送事务消息和执行本地事务
事务消息的发送过程分为两个阶段:第一阶段,发送事务消息;第二阶段,发送endTransaction消息。 
发送过程的实现类org.apache.rocketmq.client.producer.TransactionMQProducer。
transactionListener:事务监听器,主要功能是执行本地事务和执行事务回查。事务监 听 器 包 含executeLocalTransaction()和checkLocalTransaction() 两个方法。executeLocalTransaction()方法执行本地事务,checkLocalTransaction()方法是当生产者由于各种问题导致未发送Commit或Rollback消息给Broker时,Broker回调生产者查询本地事务状态的处理方法。
executorService:Broker回查请求处理的线程池。
start():事务消息生产者启动方法,与普通启动方法不同,增加了this.defaultMQProducerImpl.initTransactionEnv()的调用,即增加了初始化事务消息的环境信息
shutdown():关闭生产者,回收生产者资源。该方法是启动方法的逆过程,功能是关闭生产者、销毁事务环境。销毁事务环境是指销毁事务回查线程池,清除回查任务队列。
(1)发送Half消息的过程。
第一步:数据校验。判断transactionListener的值是否为null、消息Topic为空检查、消息体为空检查等。
第二步:消息预处理。预处理的主要功能是在消息扩展字段中设置消息类型。
第三步:发送事务消息。调用同步发送消息的方法将事务消息发送出去。
第四步:执行本地事务。消息发送成功后,执行本地事务。
(2)发送Commit或Rollback消息。
在本地事务处理完成后,根据本地事务的执行结果调用 DefaultMQProducerImpl.endTransaction()方法,通知Broker进行Commit或Rollback
当前Half消息发送完成后,会返回生产者消息发送到哪个Broker、消息位点是多少,再根据本地事务的执行结果封装 EndTransactionRequestHeader 对象,最后调用MQClientAPIImpl.endTransactionOneway()方法通知Broker进行Commit或Rollback。
Broker存储事务消息
在 Broker 中,事务消息的初始化是通过 BrokerController.initialTransaction()方法执行的
(1)TransactionalMessageService
事务消息主要用于处理服务,默认实现类是TransactionalMessageServiceImpl。如果想自定义事务消息处理实现类,需要实现 TransactionalMessageService 接口,然后通过ServiceProvider.loadClass()方法进行加载。
(2)transactionalMessageCheckListener
事务消息回查监听器,默认实现类是DefaultTransactionalMessageCheckListener。如果想自定义回查监听处理,需要继承AbstractTransactionalMessageCheckListener接口,然后通过ServiceProvider.loadClass()方法被加载。
(3)transactionalMessageCheckService
事务消息回查服务是一个线程服务,定时调用 transactionalMessageService.check()方法,检查超时的Half消息是否需要回查。
Broker 存储事务消息和普通消息都是通过org.apache.rocketmq.broker.processor.SendMessageProcessor类进行处理的,只是在存储消息时有两处事务消息需要单独处理。
第一个单独处理:判断是否是事务消息,处理方法SendMessageProcessor.sendMessage(),这里获取消息中的扩展字段 MessageConst.PROPERTY_TRANSACTION_PREPARED的值,如果该值为 True则当前消息是事务消息;再判断当前 Broker的配置是否支持事务消息,如果不支持就返回生产者不支持事务消息的信息;如果支持,则调用 TransactionalMessageService.prepareMessage()方法保存Half消息。
第二个单独处理:存储前事务消息预处理,处理方法是org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge.parseHalfMessageInner(),功能是将原消息的 Topic、queueId、sysFlg 存储在消息的扩展字段中,并且修改Topic的值为RMQ_SYS_TRANS_HALF_TOPIC,修改queueId的值为0。然后,与其他消息一样,调用DefaultMessageStore.putMessage()方法保存到CommitLog中。
Broker回查事务消息
回查是Broker发起的,Broker认为在接收Half消息后的一段时间内,如果生产者都没有发送Commit或Rollback消息给Broker,那么Broker会主动“询问”生产者该事务消息对应的本地事务执行结果,以此来决定事务是否要Commit。
回查的实现逻辑是每间隔一定时间执行 TransactionalMessageServiceImpl.check()方法,判断哪些消息超时,对超时的消息开始执行回查。
两个关键主题
RMQ_SYS_TRANS_HALF_TOPIC:保存事务消息的Topic,它存储用户发送的Half消息,有的消息已经进行了 Commit,有的消息已经进行了 Rollback,有的消息状态是未知的。
RMQ_SYS_TRANS_OP_HALF_TOPIC:也叫 OP 主题,当事务消息被 Commit 或Rollback后,会将原始事务消息的offset保存在该OP主题中。
回查流程
第一步:回查前校验。如果当前回查执行的时间超过了最大允许的执行回查时间,则跳出当前回查过程;如果当前回查的消息已经执行了Commit/Rollback,则忽略当前消息,直接回查下一条消息
第二步:检查是否有消息需要回查。如果从RMQ_SYS_TRANS_HALF_TOPIC主题中获取 Half 消息为空的次数超过允许的最大次数或者没有消息,那么表示目前没有需要再回查的消息了,可以结束本次回查过程。当然如果传入的位点是非法的,则继续下一个回查的位点。
第三步:回查次数检验,消息是否过期检验。如果 Half 消息回查次数已经超过了允许的最大回查次数,则不再回查,实现该校验的方法是 TransactionalMessageServiceImpl.needDiscard();如果 Half 消息对应的 CommitLog 已经过期,那么也不回查,该校验实现的方法是TransactionalMessageServiceImpl.needSkip()。
第四步:新发送的 Half消息不用回查,对于不是新发送的 Half消息,如果在免疫回查时间(免疫期)内,也不用回查。免疫期是指生产者在发送Half消息后、执行Commit/Rollback前,Half消息都不需要回查,这段时间就是这个Half消息的回查免疫期。
第五步:最终判断是否需要回查生产者本地事务执行结果。
回查条件
● 如果没有OP消息,并且当前Half消息在免疫期外。
● 当前Half消息存在OP消息,并且最后一个本批次OP消息中的最后一个消息在免疫期外,也就是满足回查时间。
● Broker与客户端有时间差。
● 重新将当前 Half 消息存储在 RMQ_SYS_TRANS_HALF_TOPIC 主题中。为什么需要重新存储呢?因为回查是一个异步过程,Broker不确定回查的结果是成功还是失败,所以RocketMQ做最坏的打算,如果回查失败则下次继续回查;如果本次回查成功则写入OP消息,下次再读取Half消息时也不会回查。
第六步:执行回查。在当前批次的 Half 消息回查完毕后,更新 Half 主题和 OP 主题的消费位点,推进回查进度。Broker将回查消息通过回查线程池异步地发送给生产者,执行事务结果回查。
Broker提交或回滚事务消息
当生产者本地事务处理完成并且 Broker 回查事务消息后,不管执行 Commit 还是Rollback,都会根据用户本地事务的执行结果发送一个 End_Transaction 的 RPC 请求给Broker,Broker 端处理该请求的类是 org.apache.rocketmq.broker.processor.EndTransaction Processor
第一步:End_Transaction请求校验。主要检查项如下。
● Broker角色检查。Slave Broker不处理事务消息。
● 事务消息类型检查。EndTransactionProcessor 只处理 Commit 或 Rollback 类型的事务消息,其余消息都不处理。
第二步:进行 Commit 或 Rollback。根据生产者请求头中的参数判断,是 Commit 请求还是Rollback 请求,然后分别进行处理。
7.3. 延迟消息概述
什么是延迟消息呢?延迟消息也叫定时消息,一般地,生产者在发送消息后,消费者希望在指定的一段时间后再消费。常规做法是,把信息存储在数据库中,使用定时任务扫描,符合条件的数据再发送给消费者。
RocketMQ延迟消息是通过org.apache.rocketmq.store.schedule.ScheduleMessageService类实现的
7.4. 延迟消息机制
延迟消息存储机制
delayTimeLevel:是发送消息时可以设置的延迟级别,如果该值大于0,则表示当前处理的消息是一个延迟消息。
● 将原始 Topic、queueId 备份在消息的扩展字段中,全部的延迟消息都保存在ScheduleMessageService.SCHEDULE_TOPIC的Topic中。
● 备份原始Topic、queueId为延迟消息的Topic、queueId。备份的目的是当消息到达投递时间时会恢复原始的Topic和queueId,继而被消费者拉取并消费。
巧妙地设计:在 CommitLog 中查询出消息后,调用computeDeliverTimestamp()方法计算消息具体的投递时间,再将该时间保存在 ConsumeQueue的tagCode中。
这样设计的好处是,不需要检查 CommitLog 大文件,在定时任务检查消息是否需要投递时,只需检查Consume Queue中的tagCode(不再是消息Tag的Hash值,而是消息可以投递的时间,单位是ms),如果满足条件再通过查询CommitLog将消息投递出去即可。如果每次都查询CommitLog,那么可想而知,效率会很低。
延迟消息投递机制
通过定时服务定时扫描Consume Queue,满足投递时间条件的消息再通过查询CommitLog将消息重新投递到原始的Topic中,消费者就可以接收消息了。
第一步:查找Consume Queue。两个核心方法:queueId2DelayLevel()和delayLevel2QueueId(),RocketMQ设计的延迟级别和延迟 Topic的 queueId有关系,可以互相转化。
第二步:找到投递时间。也即Consume Queue中存储的tagCode位置数据。
第三步:如果满足投递时间条件,则重新发送消息到原始Topic中。在重新投递前调用messageTimeup()方法,将消息的原始Topic、queueId、tagCode等还原,清除扩展字段中延迟消息的标志(MessageConst.PROPERTY_DELAY_TIME_LEVEL),然后被重新投递、更新消费位点。重新投递后,消息会正常创建Consume Queue索引、Index File索引,然后被消费者拉取消费,达到定时消费的目的。
第四步:如果第三步投递失败,或者消息没有达到投递时间条件,则重新提交一个定时任务到timer中,以供下次检查。
8. RocketMQ源代码阅读
8.1. 源代码结构概述
V4.2.0

V4.6.0

8.2. 源代码编译
第一步:进入RocketMQ代码根目录,执行命令:mvn-Dmaven.test.skip=true clean package,并编译。
第二步:打开IDEA,选择Open项目。为了加速Maven下载,可以事先为Maven设置一个国内镜像仓库地址。
第三步:查看打开结果。这一步耗时较长,下载Maven依赖需要一些时间。
8.3. 如何阅读源代码
第一阶段:实际需要?没听过 RocketMQ,或者听过但是没用过。这个时候完全不适合直接读源代码。可能你是刚入行的小白、久经考验的大神,在完全不了解RocketMQ的情况下直接读源代码意义不大。这个阶段强烈建议先写一写生产者、消费者的代码,在有了解决实际问题的经验后,如果发现自己还想多了解一点,此时你可以尝试进入第二阶段。
第二阶段:疯狂!使用RocketMQ进行生产、消费有一段时间了,发现一些东西总是解释不清楚,懵懵懂懂的,这时需要有针对性地读取某个功能点的源代码。当然,初次阅读其实也是蛮困难的,RocketMQ 是一个体系,即使单独一个功能也会涉及广泛的知识,大部分人就是卡在阅读源代码这里,然后就放弃了。如果还是想继续,那么就硬着头皮往下看。
第三阶段:柳暗花明!如果第二阶段通过了,恭喜你!本阶段可以使用调试阅读法。比如你想阅读消费者如何在 Broker拉取消息的源代码,可以在本地 IDEA启动 Broker 进程后,找到PullMessageProcessorTest,执行单元测试,一步步跟进去看看Broker究竟是怎么实现的。
第四阶段:同行业交流!可以在 GitHub或 Stack OverFlow上回答一些 RocketMQ相关的issue和问题,帮助自己理解RocketMQ,也可以从别人的角度重新审视RocketMQ。
第五阶段:体系化阅读。针对一个功能点,深入查看初始化、运行流程设计、运行流程和上下文的关系。
第六阶段:反思!如果在阅读完成几遍后,你不由自主地思考当初作者这么实现的原因,你会发现面试题来了:线程安全队列、阻塞队列、事件回调、NIO、二分折半查找、各种设计模式等。作者都用上了,不是为了用而用,而是真正可以提高代码效率。原来“数据结构与算法”诚不我欺也。
第七阶段:重新循环第四阶段,直到自己认为可以进行第八阶段。
第八阶段:加入Apache RocketMQ社区,把你了解的知识分享出去,也可以从社区大神那里取经。
第九阶段:重复第四阶段。
8.4. 阅读范例:通过消息id查询消息

9. RocketMQ企业最佳实践
9.1. 落地概述
为什么选择RocketMQ
稳定可靠、运维与管理方便、低成本、性能考量。
业务需求永远是第一需求。对业务而言,组件能够稳定、可靠、可持续地保障业务正常流转肯定是首要的需求。
在经历过阿里巴巴、VIPKID、蚂蚁金服、微众银行等对于稳定性有极致需求的大厂洗礼后,稳定、可靠成为了RocketMQ的代言词。
作为技术人员,一款产品的管理、运维和开发的难易程度也是选型的重要参考指标。
RocketMQ 使用 Java 语言开发,做二次开发或集成到现有系统难度较低。RocketMQ社区也相当活跃,文档包含中英文,对国内开发者非常友好。同时,社区提供的管理平台功能完善,更新也比较活跃,RocketMQ的衍生产品也非常丰富,比如rocketmq-connect-es、rocketmq-flink、rocketmq-hbase等。
成本

性能
2020年互联网行业仍然是流量的行业,高并发、大吞吐是当前面临的最大挑战之一。RocketMQ 在阿里巴巴内部各个平台都广泛使用,并且成功支撑了多个双11万亿级别的消息流量,充分证明了RocketMQ强大的吞吐能力。
如何做RocketMQ的集群管理
RocketMQ 的集群管理主要是对集群中 Broker、Namesrv、Topic、生产者、消费者进行统一管控。RocketMQ本身提供了一系列的管理接口,具体实现在org.apache.rocketmq.tools.admin.DefaultMQAdminExt.java文件中
管理API
  
9.2. 集群管理
Topic管理

创建Topic
创建Topic的核心在于,你需要知道Topic和队列(Queue)的关系,也叫Topic路由分布。
进入Topic管理页面,单击“新增/更新”按钮
集群名:RocketMQ 集群名,和下一个选项 BROKER_NAME 二者选其一,如果选择集群名,则表示这个新建Topic的Queue会分布在这个集群的全部Broker中。
BROKER_NAME:RocketMQ Broker的名字,选择后新建Topic的Queue会分布在选中的Broker机器上。集群名和BROKER_NAME只需选择一个就可以。
主题名:Topic名字,字符串长度不能超过255,不能与系统默认名字冲突,还需要满足正则表达式:^[%|a-zA-Z0-9_-]+$。
读/写队列数量:每台Broker上Queue的数量,建议设置为相同值。
perm:Topic 权限,这是一个常量,该常量的定义在org.apache.rocketmq.common.constant.PermName类中。2表示只写,4表示只读,6表示读写均可。
创建完成后,检查创建的Topic是否和我们预期的一致。刷新页面后,搜索刚刚创建的topicA,单击“路由信息”按钮,可以看到topicA的路由信息
Topic扩容
在实际业务中发现当前的发送效率已经不能满足业务需求,那么扩容是一种常用的处理手段。 
第一步:部署新的master broker 3、master broker 4机器
第二步:回到RocketMQ Console平台的Topic管理页面,单击“新增/更新”按钮。
第三步:在BROKER_NAME输入框中选择新加的Broker。在输入Queue数量时要注意读、写 Queue 的数值表示在单台 Broker 中的数量,如果想将一半的流量分流到新的 Broker中,那么读、写 Queue数量都填 2。扩容后 topicA 的发送效率将增加一倍,新旧2组机器压力承担比例为1:1
Topic权限更改
如果扩容后Master Broker 4机器有问题,那么理论上topicA会有1/4的消息发送失败,此时可以更改topicA在Master Broker 4上的权限为0,表示不可读、不可写
Topic重置消费位点
重置消费位点,也叫消息重放,表示想要重新消费以前的消息。
RocketMQ-Console 支持将指定消费者组的消费位点重置到某一个时间点,这样从指定的时间点到现在的所有消息都会重新投递给这个消费者组消费。
注意事项
(1)回放消息的发送时间和实际消息的发送时间及顺序都不同。
比如之前发送的消息是按照msg1、msg2顺序发送并消费的,通过重置消费位点重新消费时,可能是按照msg2、msg1顺序发送的。
(2)重置时间最多不超过CommitLog保存时间。
如果CommitLog设置保存7天,那么最多只能重新消费7天的消息。
消费者管理

创建/更新消费者组

clusterName:集群名,如果选择一个集群,表示当前创建的消费者组可以消费这个集群中全部Broker的Topic。
brokerName:Broker名字,可以选多个。与clusterName字段,两者任意选择一个即可。此时必须选择topicA所在的2个Broker机器:Master Broker 1和Master Broker 2。
groupName:消费者组名,字符串长度不能超过255,不能和系统默认名字冲突,还需要满足正则表达式:^[%|a-zA-Z0-9_-]+$。此处输入consumerGroupA。
consumeEnable:消费开关,True表示允许正常消费,False表示暂停消费者组中的全部消费者实例的消费消息。
consumeBroadcastEnable:广播开关,True 表示当前消费者支持广播消费,False 表示支持集群消费。
retryQueueNums:重试Topic的队列数,默认为1。
brokerId:0表示Master Broker,1表示Slave Broker。
whichBrokerWhenConsumeSlowly:如果从当前 Broker 消费消息慢,那么从哪个Broker消费能够分担消费压力。默认为1,表示从Slave Broker消费。
增加消费吞吐量
(1)加快业务代码消费,此为根本,但没有通用的解决办法,都需要具体问题具体分析,这里暂时不做过多讲解。
(2)增加单个消费者的消费线程数。
consumeThreadMin、consumeThreadMax 表示最小、最大消费线程数。一般地consumeThreadMin小于或等于consumeThreadMax。consumeThreadMin其实是消费线程池的核心线程数,这个值越大,说明有越多的回调线程可以同时回调给消费代码进行消费。
(3)扩容消费者数。
如果增加单个消费者吞吐量后,消息堆积还是很多,那么可以横向扩展消费者实例数,以此增加消费吞吐量。但是消费者数也不能无限增加,一个Topic的队列只能被一个消费者实例消费,如果超过则会产生消费者“空消费”。
订阅关系不一致的排查
订阅关系一致表示消费者组、Topic、标签在一个消费者组中须保持一致
不一致的表现
(1)通过查看消费详情,可以看到消费者客户端有空白项,多打开几次发现空白项的客户端位置在发生变化。
(2)通过查看消费详情,可以看到消费的主题数也在不断变化,有时包含topicA,有时不包含。
如何排查
单击“终端”按钮,会弹出该消费者组全部的订阅关系。如果出现订阅关系不一致时,多打开几次看到的结果就会不一样,正常情况下是完全一致的。
9.3. 集群监控和报警
监控和报警架构
对于RocketMQ的监控分为硬件监控和软件监控。硬件监控一般是机器内存使用率、CPU使用率、磁盘使用率等主机监控;软件监控是Namesrv、Broker、客户端生产、消费的指标数据监控和报警。 
RocketMQ Exporter:Apache RocketMQ社区提供的标准Prometheus Exporter,可以将全部RocketMQ监控指标导出到Prometheus中。
Prometheus:中文名叫普罗米修斯,可以通过 Exporter 获取监控指标,Prometheus可以保存、查询指标,并提供报警配置。
Grafana:是一个开源的数据分析和展示平台。
Prometheus Alert Manager:Prometheus官方提供的报警组件,可以和Prometheus配合报警。Alert Manager可以配置多种监控方式,笔者推荐基于 Webhook的报警配置。
自研报警Webhook:基于Prometheus Alert Manager Webhook报警的一种实现。如何实现可以参考Prometheus Alert Manager官方的相关文档。
基于Grafana监控
RocketMQ社区提供了一个RocketMQ Prometheus Exporter,可以帮助我们快速地搭建一套监控体系
第一步:从GitHub下载源代码。
第二步:修改 Namesrv 地址。将配置文件./resources/application.properties 中的rocketmq.config.namesrvAddr配置项修改为你的Namesrv集群地址。如果有多个地址,则用分号隔开。
第三步:执行命令maven clean package。然后,在target目录下找到编译结果。
第四步:修改application.yml中的rocketmq.config.namesrvAddr配置项的值为需要监控的RocketMQ集群的Namesrv地址,启动jar,通过访问localhost:5557/metrics可以看到exporter采集的数据指标。
第五步:在Prometheus的配置文件中配置exporter。

第六步:在Grafana中添加Prometheus为数据源后,添加消费者堆积监控曲线表达式如下:rocketmq_group_diff{group=″消费者组名″,topic=″消费者订阅的topic″}。
基于Prometheus的报警
首先,需要在Prometheus中配置如下报警规则
 
然后,RocketMQ Exporter将监控指标上报到 Prometheus,Prometheus会根据报警规则计算当前上报数据是否命中规则条件。如果命中则根据在Prometheus Alert Manager中配置的如下所示的web.hook告警,将当前上报的指标信息包装成报警信息发送给web.hook中的API地址

9.4. 集群迁移
Namesrv机器之间因为是无状态的,所以迁移相对简单,在这里我们主要介绍如何迁移Broker集群。 Broker集群迁移可以采用 Topic扩容的方式进行。
第一阶段
新集群准备:需要完成新 Broker 部署、Broker 服务器主机监控、Broker 指标监控、Broker功能测试、Broker性能测试。务必保证新集群能被监控,务必保证新集群的可用性和性能,务必保证迁移到新集群后原有的吞吐量和支持能力不会下降,否则很难保证对业务无影响。
第二阶段
Topic 迁移:将原集群中的全部 Topic 信息按照原有配置(配置主要包含队列数量、Topic权限等)读出,写入新集群机器。
消费者组迁移:将原集群中全部消费者组信息按照原有配置(配置主要包含订阅的Topic所在的Broker机器、消费者组名字、广播设置等)全部读出,写入新集群。 迁移完 Topic 和消费者组后,Topic 相当于全部扩容,新消息会写入新、旧集群,消费也会在新、旧两个集群中进行。
第三阶段
禁写原集群:如何禁写可以查看Topic 权限部分,将原有集群的全部 Broker中的Topic依次设置为不可写、可读。这样生产者在进行Rebalance后,就不再向旧Broker写入消息;同时消费者在进行Rebalance后,将同时从新集群、旧集群消费信息。 当旧集群消息消费完毕后,将旧集群中的全部Topic设置为不可读、不可写,生产者再次进行Rebalance 后,只向新集群写消息;同时消费者再次进行 Rebalance 后,也只从新集群消费消息。自此,生产消费完全透明地迁移到了新集群。
原集群校验与下线:在确认原集群没有任何读写流量后,关闭原集群报警,下线原有Broker机器。
9.5. 测试环境实践
“抢消费”问题
解决方案1
每一个子环境启动一套RocketMQ集群,这样每个环境完全独立,互相不干扰。
比较简单,但是成本比较高。
解决方案2
每一个子环境创建一套订阅关系。
操作相对麻烦,但是成本比较低。
9.6. 接入实践
Spring接入RocketMQ
第一步:添加maven依赖。

第二步:添加生产者spring配置。

第三步:添加生产者代码。

第四步:消费代码。
消费者主要使用RocketMQMessageListener接口进行监听配置。 RocketMQMessageListener有哪些常用配置。 (1)消费配置 String consumerGroup():消费者组。 String topic():Topic名字。 SelectorType selectorType() default SelectorType.TAG:过滤方式,默认为Tag过滤。 String selectorExpression() default″*:过滤值,默认为全部消费,不过滤。 ConsumeMode consumeMode():消费模式,有顺序消费、并发消费。 MessageModel messageModel():消息模式,有集群消费、广播消费。 int consumeThreadMax() default 64:最大消费线程数,默认为64。 long consumeTimeout() default 30000L:消费超时,默认为30s。 (2)ACL权限配置 String accessKey() default ACCESS_KEY_PLACEHOLDER String secretKey() default SECRET_KEY_PLACEHOLDER String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; (3)消息轨迹配置 boolean enableMsgTrace() default true:是否开启消息轨迹,默认打开。 String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER:保存消息轨迹的Topic。 String nameServer() default NAME_SERVER_PLACEHOLDER:消费Namesrv地址。
10. 附录
10.1. RocketMQ 4.2.0 Namesrv基本配置参数
 
10.2. RocketMQ4.2.0 Broker基本配置参数
     
10.3. RocketMQ Exporter核心指标说明
    
RockectMQ面试题
01.为什么要用RocketMq?
优势
吞吐量高:单机吞吐量可达十万级
可用性高:分布式架构
消息可靠性高:经过参数优化配置,消息可以做到0丢失
功能支持完善:MQ功能较为完善,还是分布式的,扩展性好
支持10亿级别的消息堆积:不会因为堆积导致性能下降
源码是java:方便我们查看源码了解它的每个环节的实现逻辑,并针对不同的业务场景进行扩展
可靠性高:天生为金融互联网领域而生,对于要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
稳定性高:RoketMQ在上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验
02.RocketMq的部署架构了解吗?

NameServer集群
NameServer 担任路由消息的提供者。
生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表分别进行发送消息和消费消息。
nameServer由多个无状态的节点构成,节点之间无任何信息同步
每个NameServer注册的信息都是一样的,而且是当前系统中的所有broker的元数据信息
Producer集群
Producer负责生产消息,一般由业务系统负责生产消息。
一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要
Cosumer集群
Consumer负责消费消息,一般是后台系统负责异步消费。
一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费
Broker集群
Broker,消息中转角色,负责存储消息、转发消息。
在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备
broker会定期向NameServer以发送心跳包的方式,轮询向所有NameServer注册以下元数据信息
1)broker的基本信息(ip port等)
2)主题topic的地址信息
3)broker集群信息
4)存活的broker信息
5)filter 过滤器
03.它有哪几种部署类型?分别有什么特点?
1)单Master
单机模式, 即只有一个Broker, 如果Broker宕机了, 会导致RocketMQ服务不可用, 不推荐使用
2)多Master模式
优点
组成一个集群, 集群每个节点都是Master节点, 配置简单, 性能也是最高, 某节点宕机重启不会影响RocketMQ服务
缺点
如果某个节点宕机了, 会导致该节点存在未被消费的消息在节点恢复之前不能被消费
3)多Master多Slave模式,异步复制
每个Master配置一个Slave, 多对Master-Slave, Master与Slave消息采用异步复制方式, 主从消息一致只会有毫秒级的延迟
优点
弥补了多Master模式(无slave)下节点宕机后在恢复前不可订阅的问题。在Master宕机后, 消费者还可以从Slave节点进行消费。采用异步模式复制,提升了一定的吞吐量。总结一句就是,采用多Master多Slave模式,异步复制模式进行部署,系统将会有较低的延迟和较高的吞吐量
缺点
如果Master宕机, 磁盘损坏的情况下, 如果没有及时将消息复制到Slave, 会导致有少量消息丢失
4)多Master多Slave模式,同步双写
与多Master多Slave模式,异步复制方式基本一致,唯一不同的是消息复制采用同步方式,只有master和slave都写成功以后,才会向客户端返回成功
优点
数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点
会降低消息写入的效率,并影响系统的吞吐量
实际部署中,一般会根据业务场景的所需要的性能和消息可靠性等方面来选择后两种
04.你自己部署过RocketMq吗?简单说一下你当时部署的过程
由于我们项目中主要使用rocketMq做链路跟踪功能,因此需要比较高的性能,并且偶尔丢失几条消息也关系不大,所以我们就选择多Master多Slave模式,异步复制方式进行部署 部署过程简单说一下: 我部署的是双master和双slave模式集群,并部署了两个nameserver节点 1)服务器分配 分配是两台服务器,A和B,其中A服务器部署nameserv1,master1,slave2;B服务器部署nameserv2,master2和slave1节点  2)broker的配置 分别配置rocketmq安装目录下四个配置文件: master1:/conf/2m-2s-async/broker-a.properties slave2:/conf/2m-2s-async/broker-b-s.properties master2:/conf/2m-2s-async/broker-b.properties slave1:/conf/2m-2s-async/broker-a-s.properties  总的思路是: a.master节点的brokerId为0,slave节点的brokerId为1(大于0即可); b.同一组broker的broker-Name相同,如master1和slave1都为broker-a; c.每个broker节点配置相同的NameServer; d.复制方式配置:master节点配置为ASYNC-MASTER,slave节点配置为SLAVE即可; e.刷盘方式分为同步刷盘和异步刷盘,为了保证性能而不去考虑少量消息的丢失,因此统一配置为异步刷盘 3)启动集群 a 检查修改参数 启动前分别检查修改runbroker.sh和runserver.sh两个文件中的JVM参数,默认的JAVA_OPT参数的值比较大,若直接启动可能会失败,需要根据实际情况重新配置  b 分别启动两个namerser节点 nohup sh bin/mqnamesrv > /dev/null 2>&1 & 查看日志 tail -f ~/logs/rocketmqlogs/namesrv.log c 分别启动4个broker节点 maste1 nohup sh bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties & slave1 nohup sh bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties & maste2 nohup sh bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b.properties & slave2 nohup sh bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties & 查看日志: tail -f ~/logs/rocketmqlogs/broker.log 总结:集群环境部署,主要就是以上三个步骤,需要注意的是过程中broker配置文件的配置正确性,还需要注意一下启动前对jvm参数的检查
05.rocketmq如何保证高可用性?
1)集群化部署NameServer
Broker集群会将所有的broker基本信息、topic信息以及两者之间的映射关系,轮询存储在每个NameServer中(也就是说每个NameServer存储的信息完全一样)。因此,NameServer集群化,不会因为其中的一两台服务器挂掉,而影响整个架构的消息发送与接收
2)集群化部署多broker
producer发送消息到broker的master,若当前的master挂掉,则会自动切换到其他的master cosumer默认会访问broker的master节点获取消息,那么master节点挂了之后,该怎么办呢?它就会自动切换到同一个broker组的slave节点进行消费 那么你肯定会想到会有这样一个问题:consumer要是直接消费slave节点,那master在宕机前没有来得及把消息同步到slave节点,那这个时候,不就会出现消费者不就取不到消息的情况了? 这样,就引出了下一个措施,来保证消息的高可用性
3)设置同步复制
消息发送到broker的master节点上,master需要将消息复制到slave节点上,rocketmq提供两种复制方式:同步复制和异步复制 异步复制,就是消息发送到master节点,只要master写成功,就直接向客户端返回成功,后续再异步写入slave节点 同步复制,就是等master和slave都成功写入内存之后,才会向客户端返回成功 那么,要保证高可用性,就需要将复制方式配置成同步复制,这样即使master节点挂了,slave上也有当前master的所有备份数据,那么不仅保证消费者消费到的消息是完整的,并且当master节点恢复之后,也容易恢复消息数据 在master的配置文件中直接配置brokerRole:SYNC_MASTER即可
06.rocketmq的工作流程是怎样的?
1)首先启动NameServer
NameServer启动后监听端口,等待Broker、Producer以及Consumer连上来
2)启动Broker
启动之后,会跟所有的NameServer建立并保持一个长连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系
3)创建Topic
创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
4)Producer发送消息
启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic所在的Broker;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送
5)Consumer消费消息
跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,进行消息的消费
07.RocketMq使用哪种方式消费消息,pull还是push?
RocketMq提供两种方式:pull和push进行消息的消费 而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的 push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉好像消息是被推送过来的 其实想想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息肯定都是需要消费者主动去拉的喽~
08.RocketMq如何负载均衡?
1)producer发送消息的负载均衡
默认会轮询向Topic的所有queue发送消息,以达到消息平均落到不同的queue上;而由于queue可以落在不同的broker上,就可以发到不同broker上(当然也可以指定发送到某个特定的queue上)
2)consumer订阅消息的负载均衡
假设有5个队列,两个消费者,则第一个消费者消费3个队列,第二个则消费2个队列,以达到平均消费的效果。而需要注意的是,当consumer的数量大于队列的数量的话,根据rocketMq的机制,多出来的队列不会去消费数据,因此建议consumer的数量小于或者等于queue的数量,避免不必要的浪费
09.RocketMq的存储机制了解吗?
RocketMq采用文件系统进行消息的存储,相对于ActiveMq采用关系型数据库进行存储的方式就更直接,性能更高了 RocketMq与Kafka在写消息与发送消息上,继续沿用了Kafka的这两个方面:顺序写和零拷贝
1)顺序写
磁盘顺序写的性能极高,在磁盘个数一定,转数一定的情况下,基本和内存速度一致
2)零拷贝
零拷贝技术采用了MappedByteBuffer内存映射技术,采用这种技术有一些限制,其中有一条就是传输的文件不能超过2G,这也就是为什么RocketMq的存储消息的文件CommitLog的大小规定为1G的原因
RocketMq采用文件系统存储消息,并采用顺序写写入消息,使用零拷贝发送消息,极大得保证了RocketMq的性能
10.RocketMq的存储结构是怎样的?

消息生产者发送消息到broker,都是会按照顺序存储在CommitLog文件中,每个commitLog文件的大小为1G
CommitLog-存储所有的消息元数据,包括Topic、QueueId以及message
CosumerQueue-消费逻辑队列:存储消息在CommitLog的offset
IndexFile-索引文件:存储消息的key和时间戳等信息,使得RocketMq可以采用key和时间区间来查询消息
rocketMq将消息均存储在CommitLog中,并分别提供了CosumerQueue和IndexFile两个索引,来快速检索消息
11.RocketMq如何进行消息的去重?
rocketMq保证了同一个消费组只能消费一次,但会被不同的消费组重复消费,因此这种重复消费的情况不可避免
RocketMq本身并不保证消息不重复,这样肯定会因为每次的判断,导致性能打折扣,所以它将去重操作直接放在了消费端
1)消费端处理消息的业务逻辑保持幂等性。那么不管来多少条重复消息,可以实现处理的结果都一样
2)还可以建立一张日志表,使用消息主键作为表的主键,在处理消息前,先insert表,再做消息处理。这样可以避免消息重复消费
12.RocketMq性能比较高的原因?
RocketMq采用文件系统存储消息
采用顺序写的方式写入消息
使用零拷贝发送消息
13.请说说你对 Namesrv 的了解?
1、 Namesrv 用于存储 Topic、Broker 关系信息,功能简单,稳定性高
多个 Namesrv 之间相互没有通信,单台 Namesrv 宕机不影响其它 Namesrv 与集群。
多个 Namesrv 之间的信息共享,通过 Broker 主动向多个 Namesrv 都发起心跳。正如上文所说,Broker 需要跟所有 Namesrv 连接。
即使整个 Namesrv 集群宕机,已经正常工作的 Producer、Consumer、Broker 仍然能正常工作,但新起的 Producer、Consumer、Broker 就无法工作。
这点和 Dubbo 有些不同,不会缓存 Topic 等元信息到本地文件。
2、 Namesrv 压力不会太大,平时主要开销是在维持心跳和提供 Topic-Broker 的关系数据
Broker 向 Namesr 发心跳时,会带上当前自己所负责的所有 Topic 信息,如果 Topic 个数太多(万级别),会导致一次心跳中,就 Topic 的数据就几十 M,网络情况差的话,网络传输失败,心跳失败,导致Namesrv 误认为 Broker 心跳失败。
当然,一般公司,很难达到过万级的 Topic ,因为一方面体量达不到,另一方面 RocketMQ 提供了 Tag属性。
另外,内网环境网络相对是比较稳定的,传输几十 M 问题不大。同时,如果真的要优化,Broker 可以把心跳包做压缩,再发送给 Namesrv 。不过,这样也会带来 CPU 的占用率的提升。
14.请说说你对 Broker 的了解?
1、 高并发读写服务
消息顺序写,所有 Topic 数据同时只会写一个文件,一个文件满1G ,再写新文件,真正的顺序写盘,使得发消息 TPS 大幅提高。
消息随机读,RocketMQ 尽可能让读命中系统 Pagecache ,因为操作系统访问 Pagecache 时,即使只访问 1K 的消息,系统也会提前预读出更多的数据,在下次读时就可能命中 Pagecache ,减少 IO 操作。
2、 负载均衡与动态伸缩
负载均衡
Broker 上存 Topic 信息,Topic 由多个队列组成,队列会平均分散在多个 Broker 上,而Producer 的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker 上。
动态伸缩能力(非顺序消息)
Topic 维度
假如一个 Topic 的消息量特别大,但集群水位压力还是很低,就可以扩大该 Topic 的队列数, Topic 的队列数跟发送、消费速度成正比。
Topic 的队列数一旦扩大,就无法很方便的缩小。因为,生产者和消费者都是基于相同的队列数来处理。 如果真的想要缩小,只能新建一个 Topic ,然后使用它。不过,Topic 的队列数,也不存在什么影响的,淡定。
Broker 维度
如果集群水位很高了,需要扩容,直接加机器部署 Broker 就可以。Broker 启动后向 Namesrv 注册,Producer、Consumer 通过 Namesrv 发现新Broker,立即跟该 Broker 直连,收发消息。
新增的 Broker 想要下线,想要下线也比较麻烦,暂时没特别好的方案。大体的前提是,消费者消费完该 Broker 的消息,生产者不往这个 Broker 发送消息。
3、 高可用 & 高可靠
高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。
高可靠:所有发往 Broker 的消息,有同步刷盘和异步刷盘机制。
同步刷盘时,消息写入物理文件才会返回成功。
异步刷盘时,只有机器宕机,才会产生消息丢失,Broker 挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电。
如果 Broker 挂掉,未同步到硬盘的消息,还在 Pagecache 中呆着。
4、 Broker 与 Namesrv 的心跳机制
单个 Broker 跟所有 Namesrv 保持心跳请求,心跳间隔为30秒,心跳请求中包括当前 Broker 所有的Topic 信息。
Namesrv 会反查 Broker 的心跳信息,如果某个 Broker 在 2 分钟之内都没有心跳,则认为该 Broker 下线,调整 Topic 跟 Broker 的对应关系。但此时 Namesrv 不会主动通知Producer、Consumer 有Broker 宕机。也就说,只能等 Producer、Consumer 下次定时拉取 Topic 信息的时候,才会发现有Broker 宕机。
Broker 是 RocketMQ 中最最最复杂的角色,主要包括五个模块
Broker 组件图远程处理模块:是 Broker 的入口,处理来自客户的请求。
Client Manager :管理客户端(生产者/消费者),并维护消费者的主题订阅。
Store Service :提供简单的 API 来存储或查询物理磁盘中的消息。
HA 服务:提供主节点和从节点之间的数据同步功能。
索引服务:通过指定键为消息建立索引,并提供快速的消息查询。
15.请说说你对Producer的了解?
获得 Topic-Broker 的映射关系
Producer 启动时,也需要指定 Namesrv 的地址,从 Namesrv 集群中选一台建立长连接。如果该Namesrv 宕机,会自动连其他 Namesrv ,直到有可用的 Namesrv 为止。
生产者每 30 秒从 Namesrv 获取 Topic 跟 Broker 的映射关系,更新到本地内存中。然后再跟 Topic 涉及的所有 Broker 建立长连接,每隔 30 秒发一次心跳。
在 Broker 端也会每 10 秒扫描一次当前注册的 Producer ,如果发现某个 Producer 超过 2 分钟都没有发心跳,则断开连接。
生产者端的负载均衡
生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。
这里需要注意一点:假如某个 Broker 宕机,意味生产者最长需要 30 秒才能感知到。在这期间会向宕机的 Broker 发送消息。当一条消息发送到某个 Broker 失败后,会自动再重发 2 次,假如还是发送失败,则抛出发送失败异常。
客户端里会自动轮询另外一个 Broker 重新发送,这个对于用户是透明的。
Producer 发送消息方式
1. 同步方式
2. 异步方式
3. Oneway 方式
适合大数据场景,允许有一定消息丢失的场景
16.请说说你对Consumer的理解?
获得 Topic-Broker 的映射关系
Consumer 启动时需要指定 Namesrv 地址,与其中一个 Namesrv 建立长连接。消费者每隔 30 秒从Namesrv 获取所有Topic 的最新队列情况,这意味着某个 Broker 如果宕机,客户端最多要 30 秒才能感知。连接建立后,从 Namesrv 中获取当前消费 Topic 所涉及的 Broker,直连 Broker 。
Consumer 跟 Broker 是长连接,会每隔 30 秒发心跳信息到Broker 。Broker 端每 10 秒检查一次当前存活的 Consumer ,若发现某个 Consumer 2 分钟内没有心跳,就断开与该 Consumer 的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。
消费者端的负载均衡
根据消费者的消费模式不同,负载均衡方式也不同。消费者有两种消费模式:集群消费和广播消费。
集群消费:一个 Topic 可以由同一个消费者分组( Consumer Group )下所有消费者分担消费。 具体例子:假如 TopicA 有 6 个队列,某个消费者分组起了 2 个消费者实例,那么每个消费者负责消费 3 个队列。如果再增加一个消费者分组相同消费者实例,即当前共有 3 个消费者同时消费 6 个队列,那每个消费者负责 2 个队列的消费。
广播消费:每个消费者消费 Topic 下的所有队列。
消费者消费模式
集群消费
一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。
每个 Consumer 是平均分摊 Message Queue 去做拉取消费的。而由 Producer 发送消息的时候是轮询所有的队列,所以消息会平均散落在不同的队列上,可以认为队列上的消息是平均的。那么实例也就平均地消费消息了。这种模式下,消费进度的存储会持久化到 Broker 。
当新建一个 Consumer Group 时,默认情况下,该分组的消费者会从 min offset 开始重新消费消息。
广播消费
消息将对一 个Consumer Group 下的各个 Consumer 实例都投递一遍。消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
消费进度会存储持久化到实例本地。
17.浅谈如何解决RocketMQ消息堆积的问题
如果这些消息是允许丢失的,那么就紧急修改消费者系统的代码,在代码中对所有的消息都获取到后直接丢弃,也就是不做任何的处理,这样可以快速的将积压的百多万消息处理掉。这是丢弃的方案。
如果消息是不能丢弃掉的话,就必须在消费者系统底层依赖的Redis恢复之后,根据线上Topic的MessageQueue的数量来采取相应的处理方案。
假如Topic有20个MessageQueue,而目前只有4个消费者系统在消费,那么每个消费者系统会从5个MessageQueue里获取消息,所以此时仅仅依靠4个消费者系统是不能满足快速处理MQ中积压的百多万消息的。 此时可以再申请16台机器去部署16台消费者系统实例,然后20个消费者系统同时消费,每个实例消费一个MessageQueue的消息,此时的消费速度就会提高了5倍,很快就能将积压的百多万消息处理掉。
假如Topic有4个MessageQueue,而集群中的消费者系统也是正好4个,那么就是每个消费者系统去消费一个MessageQueue里的消息,此时就无法使用上面增加消费者实例的方案来提高消费速度了。 此时应该修改4个消费者系统的代码,让他们取到消息后不写入Redis,而是直接把消息写入一个新的Topic,这样速度就会很快了,因为只是读写MQ而已。 然后新的Topic有20个MessageQueue,再部署20台消费者实例,去消费新的Topic的消息后写入到Redis中,这样就可以快速的增加消费者系统的并行处理能力,使用一个新的Topic来允许更多的消费者系统并行处理。