导图社区 黑马-RocketMQ学习记录
这是一篇关于黑马-RocketMQ学习记录的思维导图。NameServer:几乎无状态节点,可集群部署,节点之间无任何信息同步。
编辑于2023-01-10 11:46:58 河北省RocketMQ
第一章:核心功能
MQ介绍
消息队列是一种“先进先出”的数据结构 
作用
应用解耦

流量削峰

数据分发

注意事项
可用性降低
一旦MQ挂了,就会对业务造成影响
如何保证MQ的高可用?
复杂度提高
MQ的加入增加了系统的复杂度,以前系统之间是同步的远程调用,现在是通过MQ进行异步调用。
如何保证消息没有被重复消费?
如何处理消息丢失情况?
如何保证消息传递的顺序性?
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发送消息,如果B成功、C失败、D失败。
如何保证消息处理的一致性?
各MQ产品比较

基本概念
消息(Message)
消息是指消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
主题(Topic)

一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。 producer:topic 1:n consumer:topic 1:1
Topic的创建模式
手动创建Topic时,有两种模式: 集群模式:该模式下创建的Topic在该集群中,所有Broker中的Queue数量是相同的。 Broker模式:该模式下创建的Topic在该集群中,每个Broker中的Queue数量可以不同。
自动创建Topic时,默认采用的是Broker模式,会为每个Broker默认创建4个Queue。
标签(Tag)
为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
Topic是消息的一级分类,Tag是消息的二级分类。
Topic:货物 tag=上海 tag=江苏 tag=浙江 ------- 消费者 ----- topic=货物 tag = 上海 topic=货物 tag = 上海|浙江 topic=货物 tag = *
队列(Queue)
一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。 
存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。
消息标识(MessageId/Key)
RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。
msgId:由producer端生成
生成规则为:producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode +当前时间 + AutomicInteger自增计数器
offsetMsgId:由broker端生成
生成规则为: brokerIp + 物理分区的offset(Queue中的偏移量)
key:由用户指定的业务相关的唯一标识
角色介绍

Producer
RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。 生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。 一个生产者组可以同时发送多个主题的消息。
Consumer
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可 以接着消费原Consumer消费的Queue)的目标变得非常容易。

注意, 1)消费者组只能消费一个Topic的消息,不能同时消费多个Topic消息 2)一个消费者组中的消费者必须订阅完全相同的Topic
Name Server
NameServer是一个Broker与Topic路由(消息发送给哪个Broker)的注册中心,支持Broker的动态注册与发现。
Broker管理
接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。
路由信息管理
每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。 Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。
路由注册
NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着⼀个Broker列表,用来动态存储Broker的信息。
注意,这是与其它像zk、Eureka、Nacos等注册中心不同的地方。 这种NameServer的无状态方式,有什么优缺点: 优点:NameServer集群搭建简单,扩容简单。 缺点:对于Broker,必须明确指出所有NameServer地址。否则未指出的将不会去注册。也正因为如此,NameServer并不能随便扩容。
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。
路由剔除
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。
NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。
路由发现
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取Topic最新的路由。默认客户端每30秒会拉取一次最新的路由。
客户端NameServer选择策略
这里的客户端指的是Producer与Consumer
客户端在配置时必须要写上NameServer集群的地址,那么客户端到底连接的是哪个NameServer节点呢? 客户端首先会生产一个随机数,然后再与NameServer节点数量取模,此时得到的就是所要连接的 节点索引,然后就会进行连接。如果连接失败,则会采用round-robin策略,逐个尝试着去连接其它节点。
首先采用的是随机策略进行的选择,失败后采用的是轮询策略。
Broker
Broker充当着消息中转角色,负责存储消息、转发消息。 
单机环境搭建
环境准备
Linux64位系统 JDK1.8(64位) 源码安装需要安装Maven 3.2.x
安装RocketMQ
1、下载二进制包 2、解压二进制包 
启动RocketMQ
1、修改启动参数
1、修改runserver.sh  2、修改runbroker.sh 
2、启动NameServer
# 1.启动NameServer(默认端口9876) nohup sh bin/mqnamesrv & # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/namesrv.log 
3、启动Broker
# 1.启动Broker nohup sh bin/mqbroker -n localhost:9876 & # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/broker.log 
测试RocketMQ
1、发送消息
# 1.设置环境变量 export NAMESRV_ADDR=localhost:9876 # 2.使用安装包的Demo发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2、接受消息
# 1.设置环境变量 export NAMESRV_ADDR=localhost:9876 # 2.接收消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭RocketMQ
# 1.关闭NameServer sh bin/mqshutdown namesrv # 2.关闭Broker sh bin/mqshutdown broker
高可用集群搭建

集群搭建方式
1、各个角色的集群特点
NameServer
几乎无状态节点,可集群部署,节点之间无任何信息同步
Broker
Broker分为Master与Slave
每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer
Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave
Producer
Producer完全无状态,可集群部署
与NameServer集群中的其中一个节点(随机选择)建立长连接
定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳
Consumer
与NameServer集群中的其中一个节点(随机选择)建立长连接
定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳
Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
2、集群模式
单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
多Master模式
优点:1、配置简单,单个Master宕机或重启维护对应用无影响 2、在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢) 缺点:1、单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响
多Master多Slave模式(异步)
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级) 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样 缺点:Master宕机,磁盘损坏情况下会丢失少量消息
多Master多Slave模式(同步)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高且目前版本在主节点宕机后,备机不能自动切换为主机
双主双从集群搭建(同步)
 
集群工作流程
1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。 2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包。 3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。 4)Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。 5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。
1、环境准备
1、Host添加信息
vim /etc/hosts 配置信息: # nameserver 192.168.209.129 rocketmq-nameserver1 192.168.209.133 rocketmq-nameserver2 # broker 192.168.209.129 rocketmq-master1 192.168.209.129 rocketmq-slave2 192.168.209.133 rocketmq-master2 192.168.209.133 rocketmq-slave1 重启:systemctl restart network 
2、防火墙配置
1、关闭防火墙
# 关闭防火墙 systemctl stop firewalld.service # 查看防火墙的状态 firewall-cmd --state # 禁止firewall开机启动 systemctl disable firewalld.service
2、开放指定端口
# 开放name server默认端口 firewall-cmd --remove-port=9876/tcp --permanent # 开放master默认端口 firewall-cmd --remove-port=10911/tcp --permanent # 开放slave默认端口 (当前集群模式可不开启) firewall-cmd --remove-port=11011/tcp --permanent # 重启防火墙 firewall-cmd --reload
3、设置mq环境变量
vim /etc/profile 添加信息: #set rocketmq ROCKETMQ_HOME=/home/RocketMQ/rocketmq-all-4.4.0-bin-release/ PATH=$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH source /etc/profile
4、创建消息存储路径
mkdir /home/RocketMQ/store-m mkdir /home/RocketMQ/store-m/commitlog mkdir /home/RocketMQ/store-m/consumequeue mkdir /home/RocketMQ/store-m/index mkdir /home/RocketMQ/store-s mkdir /home/RocketMQ/store-s/commitlog mkdir /home/RocketMQ/store-s/consumequeue mkdir /home/RocketMQ/store-s/index 
2、Broker配置

192.168.209.129

master1
将默认注释掉,粘贴以下配置: #所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #一定要指明IP地址 brokerIP1=192.168.209.129 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/home/RocketMQ/store-m #commitLog 存储路径 storePathCommitLog=/home/RocketMQ/store-m/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/home/RocketMQ/store-m/consumequeue #消息索引存储路径 storePathIndex=/home/RocketMQ/store-m/index #checkpoint 文件存储路径 storeCheckpoint=/home/RocketMQ/store-m/checkpoint #abort 文件存储路径 abortFile=/home/RocketMQ/store-m/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128 #开启支持SQL过滤 enablePropertyFilter=true 
slave2
将默认注释掉,粘贴以下配置: #所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b #0 表示 Master,>0 表示 Slave brokerId=1 #一定要指明IP地址 brokerIP1=192.168.209.129 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/home/RocketMQ/store-s #commitLog 存储路径 storePathCommitLog=/home/RocketMQ/store-s/commitlog #消费队列存储路径存储路径 storePathConsumeQueue/home/RocketMQ/store-s/consumequeue #消息索引存储路径 storePathIndex=/home/RocketMQ/store-s/index #checkpoint 文件存储路径 storeCheckpoint=/home/RocketMQ/store-s/checkpoint #abort 文件存储路径 abortFile=/home/RocketMQ/store-s/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128 #开启支持SQL过滤 enablePropertyFilter=true 
192.168.209.133

master2
将默认注释掉,粘贴以下配置: #所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b #0 表示 Master,>0 表示 Slave brokerId=0 #一定要指明IP地址 brokerIP1=192.168.209.133 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/home/RocketMQ/store-m #commitLog 存储路径 storePathCommitLog=/home/RocketMQ/store-m/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/home/RocketMQ/store-m/consumequeue #消息索引存储路径 storePathIndex=/home/RocketMQ/store-m/index #checkpoint 文件存储路径 storeCheckpoint=/home/RocketMQ/store-m/checkpoint #abort 文件存储路径 abortFile=/home/RocketMQ/store-m/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128 #开启支持SQL过滤 enablePropertyFilter=true 
slave1
将默认注释掉,粘贴以下配置: #所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1 #一定要指明IP地址 brokerIP1=192.168.209.133 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/home/RocketMQ/store-s #commitLog 存储路径 storePathCommitLog=/home/RocketMQ/store-s/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/home/RocketMQ/store-s/consumequeue #消息索引存储路径 storePathIndex=/home/RocketMQ/store-s/index #checkpoint 文件存储路径 storeCheckpoint=/home/RocketMQ/store-s/checkpoint #abort 文件存储路径 abortFile=/home/RocketMQ/store-s/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128 #开启支持SQL过滤 enablePropertyFilter=true 
3、修改启动参数

runbroker.sh

runserver.sh

4、启动服务
启动name server
分别在两台服务器上启动name server:(默认端口9876) nohup sh mqnamesrv &
启动Broker集群
192.168.209.129
启动master1: nohup sh mqbroker -c /home/RocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties & 启动slave2: nohup sh mqbroker -c /home/RocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties & 
192.168.209.133
启动master2:nohup sh mqbroker -c /home/RocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties & 启动slave1: nohup sh mqbroker -c /home/RocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties & 
5、查看日志
# 查看nameServer日志 tail -500f ~/logs/rocketmqlogs/namesrv.log # 查看broker日志 tail -500f ~/logs/rocketmqlogs/broker.log
搭建集群监控平台
1、下载并解压zip包
 
2、修改配置文件
\rocketmq-dashboard-master\src\main\resources\application.yml  
3、打包
1、进入到rocketmq的目录  2、执行命令:mvn clean package -Dmaven.test.skip=true 3、如果报错:[ERROR] Failed to execute goal com.github.eirslett:frontend-maven-plugin:1.11.3:yarn (yarn install) on project rocketmq-dashboard: Failed to run task: 'yarn install' failed. org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1] 解决方法:注释掉pom中的这里 
4、将jar包上传至服务器
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar 
5、打开页面

消息的发送与消费
1、导入mq客户端依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> 
2、发送消息步骤(同步)
1.创建消息生产者producer,并制定生产者组名 2.指定Nameserver地址 3.启动producer 4.创建消息对象,指定主题Topic、Tag和消息体 5.发送消息 6.关闭生产者producer 
3、消费消息步骤(默认负载均衡)
1.创建消费者Consumer,制定消费者组名 2.指定Nameserver地址 3.订阅主题Topic和Tag 4.设置回调函数,处理消息 5.启动消费者consumer 
各种消息发送样例
基本样例
producer
同步消息

异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应 
单向消息
主要用在不特别关心发送结果的场景,例如日志发送 
consumer
默认负载均衡 
负载均衡

广播模式

顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。 下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。 
订单样例
package com.wzl1999.rocketmq.order; import java.util.ArrayList; import java.util.List; public class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } /** * 生成模拟订单数据 */ public static List<OrderStep> buildOrders() { List<OrderStep> orderList = new ArrayList<OrderStep>(); //创建 付款 推送 完成 OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }
producer

consumer

延迟消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。 延迟时间限制:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 // org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
producer

consumer

批量消息
批量发送消息能显著提高传递小消息的性能。 限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
producer
String topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); //处理error }
消息列表分割
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下: //把大的消息分裂成若干个小的消息 ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //处理error } } package com.wzl1999.rocketmq.batch; import org.apache.rocketmq.common.message.Message; import java.util.Iterator; import java.util.List; import java.util.Map; public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int startIndex = getStartIndex(); int nextIndex = startIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex; return subList; } private int getStartIndex() { Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); while(tmpSize > SIZE_LIMIT) { currIndex += 1; Message message = messages.get(currIndex); tmpSize = calcMessageSize(message); } return currIndex; } private int calcMessageSize(Message message) { int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节 return tmpSize; } }
过滤消息
tag过滤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
sql过滤
SQL特性可以通过发送消息时的属性来进行计算 
基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。 数值比较,比如:>,>=,<,<=,BETWEEN,=; 字符比较,比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号 AND,OR,NOT; 常量支持类型为: 数值,比如:123,3.1415; 字符,比如:'abc',必须用单引号包裹起来; NULL,特殊的常量 布尔值,TRUE 或 FALSE 只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下: public void subscribe(finalString topic, final MessageSelector messageSelector)
producer
发送消息时,你能通过 putUserProperty 来设置消息的属性 
consumer
用 MessageSelector.bySql 来使用sql筛选消息  如果报错:不支持92语法  修改broker配置文件,添加:enablePropertyFilter=true
事务消息
事务消息共有三种状态,提交状态、回滚状态、中间状态: TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息 TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费 TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态 
producer

使用限制
事务消息不支持延时消息和批量消息 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数 事务性消息可能不止一次被检查或消费 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
第二章:项目实战
1.业务介绍
下单

如何保证数据的完整性?
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退 
将失败信息发送至MQ,其他服务同时监听

支付

如何快速给第三方支付凭条做出回应?
用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。 商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应? 
MQ数据分发

2.技术分析

3.环境搭建
SpringBoot整合RocketMQ
producer
1、引入依赖

2、编写配置文件

3、写启动类

4、测试

consumer
1、引入依赖

2、编写配置文件

3、写启动类

4、编写实现类

SpringBoot整合Dubbo
1、搭建zookeeper集群(注册中心)
1、准备三台服务器(单机模拟)
2、将安装包上传至服务器

3、将文件复制三份,模拟三台机器

4、创建zkData目录

5、配置服务器ID

6、修改配置文件

7.相关命令
(1)启动 Zookeeper
./zkServer.sh start
(2)查看状态

(3)启动客户端
./zkCli.sh
(4)退出客户端
[zk: localhost:2181(CONNECTED) 0] quit
(5)停止 Zookeeper
/zkServer.sh stop
2、搭建dubbo-admin管理平台
1、将tomcat安装包上传至zookeeper服务器

2、将dubbo-admin.war包放入webapps下面

3、启动tomcat

4、访问http://ip:8080/dubbo-admin
 
3、定义RPC服务接口

4、provider
1、引入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.wzl1999</groupId> <artifactId>springboot-dubbo-provider</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> </parent> <dependencies> <!--dubbo--> <dependency> <groupId>com.alibaba.spring.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <!--spring-boot-stater--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <artifactId>log4j-to-slf4j</artifactId> <groupId>org.apache.logging.log4j</groupId> </exclusion> </exclusions> </dependency> <!--zookeeper--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!--自定义的RPC接口--> <dependency> <groupId>org.wzl1999</groupId> <artifactId>springboot-dubbo-interface</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
2、编写配置文件

3、编写启动类

4、实现RPC接口

5、consumer
1、引入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.wzl1999</groupId> <artifactId>springboot-dubbo-consumer</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--dubbo--> <dependency> <groupId>com.alibaba.spring.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <artifactId>log4j-to-slf4j</artifactId> <groupId>org.apache.logging.log4j</groupId> </exclusion> </exclusions> </dependency> <!--zookeeper--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!--自定义的RPC接口--> <dependency> <groupId>org.wzl1999</groupId> <artifactId>springboot-dubbo-interface</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
2、编写配置文件

3、编写启动类

4、编写接口,调用provider

6、测试

Mysql
/* SQLyog Ultimate v8.32 MySQL - 5.5.49 : Database - trade ********************************************************************* */ /*!40101 SET NAMES utf8 */; /*!40101 SET SQL_MODE=''*/; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; CREATE DATABASE /*!32312 IF NOT EXISTS*/`trade` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `trade`; /*Table structure for table `trade_coupon` */ DROP TABLE IF EXISTS `trade_coupon`; CREATE TABLE `trade_coupon` ( `coupon_id` bigint(50) NOT NULL COMMENT '优惠券ID', `coupon_price` decimal(10,2) DEFAULT NULL COMMENT '优惠券金额', `user_id` bigint(50) DEFAULT NULL COMMENT '用户ID', `order_id` bigint(32) DEFAULT NULL COMMENT '订单ID', `is_used` int(1) DEFAULT NULL COMMENT '是否使用 0未使用 1已使用', `used_time` timestamp NULL DEFAULT NULL COMMENT '使用时间', PRIMARY KEY (`coupon_id`), KEY `FK_trade_coupon` (`user_id`), KEY `FK_trade_coupon2` (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `trade_coupon` */ /*Table structure for table `trade_goods` */ DROP TABLE IF EXISTS `trade_goods`; CREATE TABLE `trade_goods` ( `goods_id` bigint(50) NOT NULL AUTO_INCREMENT, `goods_name` varchar(255) DEFAULT NULL COMMENT '商品名称', `goods_number` int(11) DEFAULT NULL COMMENT '商品库存', `goods_price` decimal(10,2) DEFAULT NULL COMMENT '商品价格', `goods_desc` varchar(255) DEFAULT NULL COMMENT '商品描述', `add_time` timestamp NULL DEFAULT NULL COMMENT '添加时间', PRIMARY KEY (`goods_id`) ) ENGINE=InnoDB AUTO_INCREMENT=345959443973935105 DEFAULT CHARSET=utf8; /*Data for the table `trade_goods` */ insert into `trade_goods`(`goods_id`,`goods_name`,`goods_number`,`goods_price`,`goods_desc`,`add_time`) values (345959443973935104,'华为P30',999,'5000.00','夜间拍照更美','2019-07-09 20:38:00'); /*Table structure for table `trade_goods_number_log` */ DROP TABLE IF EXISTS `trade_goods_number_log`; CREATE TABLE `trade_goods_number_log` ( `goods_id` bigint(50) NOT NULL COMMENT '商品ID', `order_id` bigint(50) NOT NULL COMMENT '订单ID', `goods_number` int(11) DEFAULT NULL COMMENT '库存数量', `log_time` timestamp NULL DEFAULT NULL, PRIMARY KEY (`goods_id`,`order_id`), KEY `FK_trade_goods_number_log2` (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `trade_goods_number_log` */ /*Table structure for table `trade_mq_consumer_log` */ DROP TABLE IF EXISTS `trade_mq_consumer_log`; CREATE TABLE `trade_mq_consumer_log` ( `msg_id` varchar(50) DEFAULT NULL, `group_name` varchar(100) NOT NULL, `msg_tag` varchar(100) NOT NULL, `msg_key` varchar(100) NOT NULL, `msg_body` varchar(500) DEFAULT NULL, `consumer_status` int(1) DEFAULT NULL COMMENT '0:正在处理;1:处理成功;2:处理失败', `consumer_times` int(1) DEFAULT NULL, `consumer_timestamp` timestamp NULL DEFAULT NULL, `remark` varchar(500) DEFAULT NULL, PRIMARY KEY (`group_name`,`msg_tag`,`msg_key`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `trade_mq_consumer_log` */ /*Table structure for table `trade_mq_producer_temp` */ DROP TABLE IF EXISTS `trade_mq_producer_temp`; CREATE TABLE `trade_mq_producer_temp` ( `id` varchar(100) NOT NULL, `group_name` varchar(100) DEFAULT NULL, `msg_topic` varchar(100) DEFAULT NULL, `msg_tag` varchar(100) DEFAULT NULL, `msg_key` varchar(100) DEFAULT NULL, `msg_body` varchar(500) DEFAULT NULL, `msg_status` int(1) DEFAULT NULL COMMENT '0:未处理;1:已经处理', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `trade_mq_producer_temp` */ /*Table structure for table `trade_order` */ DROP TABLE IF EXISTS `trade_order`; CREATE TABLE `trade_order` ( `order_id` bigint(50) NOT NULL COMMENT '订单ID', `user_id` bigint(50) DEFAULT NULL COMMENT '用户ID', `order_status` int(1) DEFAULT NULL COMMENT '订单状态 0未确认 1已确认 2已取消 3无效 4退款', `pay_status` int(1) DEFAULT NULL COMMENT '支付状态 0未支付 1支付中 2已支付', `shipping_status` int(1) DEFAULT NULL COMMENT '发货状态 0未发货 1已发货 2已收货', `address` varchar(255) DEFAULT NULL COMMENT '收货地址', `consignee` varchar(255) DEFAULT NULL COMMENT '收货人', `goods_id` bigint(50) DEFAULT NULL COMMENT '商品ID', `goods_number` int(11) DEFAULT NULL COMMENT '商品数量', `goods_price` decimal(10,2) DEFAULT NULL COMMENT '商品价格', `goods_amount` decimal(10,0) DEFAULT NULL COMMENT '商品总价', `shipping_fee` decimal(10,2) DEFAULT NULL COMMENT '运费', `order_amount` decimal(10,2) DEFAULT NULL COMMENT '订单价格', `coupon_id` bigint(50) DEFAULT NULL COMMENT '优惠券ID', `coupon_paid` decimal(10,2) DEFAULT NULL COMMENT '优惠券', `money_paid` decimal(10,2) DEFAULT NULL COMMENT '已付金额', `pay_amount` decimal(10,2) DEFAULT NULL COMMENT '支付金额', `add_time` timestamp NULL DEFAULT NULL COMMENT '创建时间', `confirm_time` timestamp NULL DEFAULT NULL COMMENT '订单确认时间', `pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间', PRIMARY KEY (`order_id`), KEY `FK_trade_order` (`user_id`), KEY `FK_trade_order2` (`goods_id`), KEY `FK_trade_order3` (`coupon_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `trade_order` */ /*Table structure for table `trade_pay` */ DROP TABLE IF EXISTS `trade_pay`; CREATE TABLE `trade_pay` ( `pay_id` bigint(50) NOT NULL COMMENT '支付编号', `order_id` bigint(50) DEFAULT NULL COMMENT '订单编号', `pay_amount` decimal(10,2) DEFAULT NULL COMMENT '支付金额', `is_paid` int(1) DEFAULT NULL COMMENT '是否已支付 1否 2是', PRIMARY KEY (`pay_id`), KEY `FK_trade_pay` (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `trade_pay` */ /*Table structure for table `trade_user` */ DROP TABLE IF EXISTS `trade_user`; CREATE TABLE `trade_user` ( `user_id` bigint(50) NOT NULL AUTO_INCREMENT COMMENT '用户ID', `user_name` varchar(255) DEFAULT NULL COMMENT '用户姓名', `user_password` varchar(255) DEFAULT NULL COMMENT '用户密码', `user_mobile` varchar(255) DEFAULT NULL COMMENT '手机号', `user_score` int(11) DEFAULT NULL COMMENT '积分', `user_reg_time` timestamp NULL DEFAULT NULL COMMENT '注册时间', `user_money` decimal(10,0) DEFAULT NULL COMMENT '用户余额', PRIMARY KEY (`user_id`) ) ENGINE=InnoDB AUTO_INCREMENT=345963634385633281 DEFAULT CHARSET=utf8; /*Data for the table `trade_user` */ insert into `trade_user`(`user_id`,`user_name`,`user_password`,`user_mobile`,`user_score`,`user_reg_time`,`user_money`) values (345963634385633280,'刘备','123L','18888888888L',100,'2019-07-09 13:37:03','900'); /*Table structure for table `trade_user_money_log` */ DROP TABLE IF EXISTS `trade_user_money_log`; CREATE TABLE `trade_user_money_log` ( `user_id` bigint(50) NOT NULL COMMENT '用户ID', `order_id` bigint(50) NOT NULL COMMENT '订单ID', `money_log_type` int(1) NOT NULL COMMENT '日志类型 1订单付款 2 订单退款', `use_money` decimal(10,2) DEFAULT NULL, `create_time` timestamp NULL DEFAULT NULL COMMENT '日志时间', PRIMARY KEY (`user_id`,`order_id`,`money_log_type`), KEY `FK_trade_user_money_log2` (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `trade_user_money_log` */ /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; 
优惠卷表

商品表

订单表

订单商品日志表

用户表

用户余额日志表

订单支付表

MQ消息生产表

MQ消息消费表

4.下单业务
5.支付业务
6.整体联调
第三章:高级功能和源码分析
高级功能部分
1.消息存储

1.存储介质
关系型数据库
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
性能对比
文件系统 > 关系型数据库DB
2.消息的存储和发送
存储
磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。 RocketMQ的消息用顺序写,保证了消息存储的速度。
发送
RocketMQ采用零拷贝技术,省去了用户态过程,默认单个CommitLog日志文件大小为1G 
3.消息存储结构
RocketMQ消息的存储是由CommitLog和ConsumerQueue配合完成的。  消息真正的物理存储文件是CommitLog(存储消息的元数据)。 ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址(存储消息在CommitLog的索引)。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。 IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程  
4.刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。 RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候有两种写磁盘方式:分布式同步刷盘和异步刷盘。  同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。 
同步刷盘
在返回写成功状态时,消息已经被写入磁盘。 具体流程是:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大; 当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
5.消息主从复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。 
1.同步复制
同步复制方式:等Master和Slave均写成功后才反馈给生产者写成功状态; 在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
2.异步复制
异步复制方式:只要Master写成功即可反馈给生产者写成功状态。 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。  
推荐配置:异步刷盘+主从同步复制

4.高可用机制
 RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。 Master和Slave的区别:在Broker的配置文件中,参数brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。 Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
1.消费消息高可用
Broker主从架构实现高可用! 在Consumer的配置文件中,并不需要设置是从Master读还是从Slave读! 当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
2.生产消息高可用
Broker多主多从架构! 在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。
5.负载均衡
Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:  图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至Queue 0,然后第二条消息发送至Queue 1,以此类推。
Consumer负载均衡
集群模式

广播模式

6.消息重试
1.顺序消息的重试
当消费者消费顺序消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间为1 秒),这时应用会出现消息消费被阻塞的情况。 因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
2.无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。 无序消息的重试只针对集群消费方式生效; 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
重试次数

获取消息重试次数

编码配置
1、配置消费重试
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置 (三种方式任选一种): - 返回 Action.ReconsumeLater (推荐) - 返回 Null - 抛出异常 
2、配置消费不重试
集群消费方式下,消息失败后期望消息不重试。 1、捕获消费逻辑中可能抛出的异常 2、最终返回 Action.CommitMessage 
3、自定义最大重试次数

7.死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)
1.死信特性

2.查看死信信息

8.消息幂等
幂等性:(处理必须唯一) 无论这个业务请求被(consumer)执行多少次,我们的数据库的结果都是唯一的,不可变的。 造成重复消费的原因? 因为网络原因闪断,ACK返回失败等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。 消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
发送消息幂等
消费消息幂等

源码分析
路由中心NameServer
NameServer架构设计
NameServer启动流程
NameServer路由注册和故障剔除
消息生产者Producer
生产者启动流程
生产者发送消息流程
批量发送
消息存储
消息存储流程
存储文件与内存映射
存储文件
实时更新消息消费队列和存储文件
消息队列与索引文件恢复
刷盘机制
过期文件删除机制
消息消费Consumer
消费者启动流程
消息拉取
消息队列负载均衡和重新分布机制
消息消费过程
定时消息机制
顺序消息