导图社区 消息队列
消息队列、生产者发送消息的请求,生产者将消息发送至Kafka集群中的某个Broker,Broker接收到此请求后持久化此消息并更新相关元数据信息。
编辑于2022-11-10 10:16:25时间管理-读书笔记,通过学习和应用这些方法,读者可以更加高效地利用时间,重新掌控时间和工作量,实现更高效的工作和生活。
本书是法兰教授的最新作品之一,主要阐明了设计史的来源、设计史现在的状况以及设计史的未来发展可能等三个基本问题。通过对设计史学科理论与方法的讨论,本书旨在促进读者对什么是设计史以及如何写作一部好的设计史等问题的深入认识与反思。
《计算机组成原理》涵盖了计算机系统的基本组成、数据的表示与运算、存储系统、指令系统、中央处理器(CPU)、输入输出(I/O)系统以及外部设备等关键内容。通过这门课程的学习,学生可以深入了解计算机硬件系统的各个组成部分及其相互之间的连接方式,掌握计算机的基本工作原理。
社区模板帮助中心,点此进入>>
时间管理-读书笔记,通过学习和应用这些方法,读者可以更加高效地利用时间,重新掌控时间和工作量,实现更高效的工作和生活。
本书是法兰教授的最新作品之一,主要阐明了设计史的来源、设计史现在的状况以及设计史的未来发展可能等三个基本问题。通过对设计史学科理论与方法的讨论,本书旨在促进读者对什么是设计史以及如何写作一部好的设计史等问题的深入认识与反思。
《计算机组成原理》涵盖了计算机系统的基本组成、数据的表示与运算、存储系统、指令系统、中央处理器(CPU)、输入输出(I/O)系统以及外部设备等关键内容。通过这门课程的学习,学生可以深入了解计算机硬件系统的各个组成部分及其相互之间的连接方式,掌握计算机的基本工作原理。
消息队列
消息队列
理论
WHY
应用场景
缓冲和削峰 :
上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
解耦和扩展性 :
项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
冗余 :
可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
健壮性 :
消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
异步通信 :
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
可恢复性:
系统的一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka可保证一个分区内的消息是有序的。
WHAT
消息模型
队列模型
图
生产者(Producer)发消息就是入队操作,
消费者(Consumer)收消息就是出队也就是删除操作,
服务端存放消息的容器自然就称为“队列”。
发布 - 订阅模型
图
消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。
发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。
HOW
数据传输的事务定义
最多一次:
消息不会被重复发送,最多被传输一次,但也有可能一次不传输
最少一次:
消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
精确的一次( Exactly once)
不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的
主要产品
图
Kafka、RabbitMQ、RocketMQ的调研比对
Kafka
优势:
在性能方面kafka可以说是业界非常优秀的一款中间件,在常规的机器配置下,一台机器可以达到每秒几十万的QPS。并且Kafka的性能也非常高,基本上发给kafka的消息都是毫秒级别的,
可用性也特别高,kafka是支持集群部署的,并且其中部分机器宕机,还是可以运行的。
劣势:
kafka有可能会丢失数据,因为kafka收到消息之后,会写一个磁盘缓冲区里,并没有直接落地到物理磁盘上去,所以机器故障之后,可能会导致磁盘缓冲区的数据丢失。
kafka的功能比较单一,主要是支持发送消息给它,然后从里面消费消息,其它就没有什么额外的高级功能了,所以基于kafka有限的功能,可能适用的场景并不是很多。
综上所述:一般公司会利用kafka收集一些日志之类的消息,因为日志一般量特别大,即使丢几条数据也没事,并且要求吞吐量也高,一般就是收发消息,不需要太多的功能,所以kafka非常适合这个场景。
RabbitMQ
优势:
在RocketMQ没有出现之前,好多公司都从ActiveMQ切换到了RabbitMQ,它的优势在于可以保证数据不丢失,也能保证高可用性,即使集群部署部分机器宕机也能运行,然后支持部分高级功能,比如死信队列,消息重试之类的。
缺点:
RabbitMQ的吞吐量比较低,一般就是几万的级别,如果遇到特别高的并发时,支撑起来有点困难。并且进行集群的扩展也是比较麻烦的。还有就是开发语言用的是erlang,国内使用此语言的很少,所以对其深入的研究也是比较麻烦的。
RocketMQ
优点:
RocketMQ几乎同时解决了Kafka和RabbitMQ的缺陷。
它的吞吐量也非常高,单机可以达到10万的QPS以上,
可以保证高可用性,
可以通过配置达到数据保证不会丢失,
可以部署大规模的集群,
支持各种高级功能,比如说延迟消息、事务消息、消息回溯、死信队列、消息积压等。
是利用java开发的,符合国内的大多数公司的技术栈,很容易进行阅读源码和修改其内容。
缺点:
RocketMQ的官方文档相比较于kafka和RabbitMQ来说的话会相对简单一些,没有人家kafka和RabbitMQ的文档写的详细。
kafka
架构图
what
架构组成
broker:
Kafka 服务器,负责消息存储和转发
broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。
topic:
消息类别, Kafka 按照 topic 来分类消息
partition:
topic 的分区,一个 topic 可以包含多个 partition, topic 消息保存在各个partition 上
offset:
消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表该消息的唯一序号
Producer:
消息生产者
Consumer:
消息消费者
Consumer Group:
消费者分组,每个 Consumer 必须属于一个 group
Zookeeper:
保存着集群 broker、 topic、 partition 等 meta 数据;另外,还负责 broker 故障发现
partition leader:
选举,负载均衡等功能
消息模型
消费者
消费数据
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量( offset)的位置等到下次消费时,他会接着上次位置继续消费
消费者负载均衡策略
一个消费者组中的一个分片对应一个消费者成员,他能保证每个消费者成员都能访问,如果组中成员太多会有空闲的成员
数据有序
一个消费者组里它的内部是有序的 消费者组与消费者组之间是无序的
维护消费状态跟踪的方法
Kafka 采用了不同的策略。Topic 被分成了若干分区,每个分区在同一时间只被一个 consumer 消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。这带来了另外一个好处:consumer 可以把 offset 调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?
消息丢失和重复消费
要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。
大部分消息系统在 broker 端的维护消息被消费的记录:一个消息被分发到consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,一旦 consumer 处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题, 很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到 consumer 已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,
但产生了新问题,首先如果 consumer处理消息成功了但是向 broker 发送响应时失败了,这条消息将被消费两次。第二个问题时,broker 必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,
1、消息发送
kafka 的 ack 机制
request.required.acks 有三个值 0 1 -1
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他不确保是否复制完成新 leader 也会导致数据丢失
-1:同样在 1 的基础上 服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的 ack,这样数据不会丢失
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。
Kafka通过配置request.required.acks属性来确认消息的生产:0---表示不进行消息接收是否成功的确认;1---表示当Leader接收成功时确认;-1---表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
2、消息消费Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;解决办法:
针对消息丢失:
同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;
异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。
消息顺序性
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.
Pull 模式
最终 Kafka 还是选取了传统的 pull 模式
Pull 模式的另外一个好处是 consumer 可以自主决定是否批量的从 broker 拉取数据。
Push模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。
Pull 模式下, consumer 就可以根据自己的消费能力去决定这些策略
Pull 有个缺点是,如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到 达。为了避免这点, Kafka 有个参数可以让 consumer 阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发)
Kafka 的消息模型和 RocketMQ 是完全一样的
队列这个概念的名称不一样,Kafka 中对应的名称是“分区(Partition)
与传统消息系统区别
Kafka 持久化日志,这些日志可以被重复读取和无限期保留
Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
Kafka 支持实时的流式处理
配置相关
生产数据时数据的分组策略
生产者决定数据产生到集群的哪个 partition 中 每一条消息都是以( key, value)格式 Key是由生产者发送数据传入 所以生产者( key)决定了数据产生到集群的哪个 partition
实现延迟队列
Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。
时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。
底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。
TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.Kafka中到底是怎么推进时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。
无消息丢失的配置
不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1
。确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。确保消息消费完成再提交。
Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关
消费指定分区消息
Kafka consumer 消费消息时,向 broker 发出"fetch"请求去消费特定分区的消息, consumer指定消息在日志中的偏移量( offset),就可以消费从这个位置开始的消息, customer 拥有了 offset 的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的
Kafka 创建 Topic 时如何将分区放置到不同的 Broker 中
(1).副本因子不能大于 Broker 的个数;
(2).第一个分区(编号为 0)的第一个副本放置位置是随机从 brokerList 选择的;
(3).其他分区的第一个副本放置位置相对于第 0 个分区依次往后移。也就是如果我们有 5 个Broker, 5 个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个Broker 上,依次类推;
(4).剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的
通信协议
图
详情
ProducerRequest:
生产者发送消息的请求,生产者将消息发送至Kafka集群中的某个Broker,Broker接收到此请求后持久化此消息并更新相关元数据信息。
TopicMetadataRequest:
获取Topic元数据信息的请求,无论是生产者还是消费者都需要通过此请求来获取感兴趣的Topic的元数据。
FetchRequest:
消费者获取感兴趣Topic的某个分区的消息的请求,除此之外,分区状态为Follower的副本也需要利用此请求去同步分区状态为Leader的对应副本数据。
OffsetRequest:
消费者发送至Kafka集群来获取感兴趣Topic的分区偏移量的请求,通过此请求可以获知当前Topic所有分区在不同时间段的偏移量详情。
OffsetCommitRequest:
消费者提交Topic被消费的分区偏移量信息至Broker,Broker接收到此请求后持久化相关偏移量信息。
OffsetFetchRequest:
消费者发送获取提交至Kafka集群的相关Topic被消费的详细信息,和OffsetCommitRequest相互对应。
集群
Kafka 判断一个节点是否还活着有那两个条件
( 1)节点必须可以维护和 ZooKeeper 的连接, Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
副本 :ISR、AR
ISR: In-Sync Replicas 副本同步队列
AR: Assigned Replicas 所有副本
ISR 是由leader维护,follower从leader同步数据有一些延迟
(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),
任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
什么情况下一个 broker 会从 isr中踢出去
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护 ,如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除 。
producer 是否直接将数据发送到broker 的 leader(主节点)?
producer 直接将数据发送到 broker 的 leader(主节点),不需要在多个节点进行分发,为了帮助 producer 做到这点,所有的 Kafka 节点都可以及时的告知:哪些节点是活动的,目标topic 目标分区的 leader 在哪。这样 producer 就可以直接将消息发送到目的地了
why
磁盘顺序读写。
Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升 。
利用了操作系统本身的Page Cache,
就是利用操作系统自身的内存而不是JVM空间内存,为了优化读写性能。
“零拷贝” 机制
使用了sendfile方法, 允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。示意图如下:
分区分段+索引的设计
Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度
启用批次写入
在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。
它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
高效文件存储
Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
通过索引信息可以快速定位 message 和确定 response 的最大大小。
通过 index 元数据全部映射到 memory,可以避免 segment file 的 IO 磁盘操作。
通过索引文件稀疏存储,可以大幅降低 index 文件元数据占用空间大小。
存储消息格式
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和 CRC32校验码。
1. 消息长度: 4 bytes (value: 1+4+n)
2. 版本号: 1 byte 3CRC
3. 校验码: 4 bytes
4. 具体的消息: n bytes
partition 数据保存
topic 中的多个 partition 以文件夹的形式保存到 broker,每个分区序号从 0 递增,且消息有序 Partition 文件下有多个 segment( xxx.index, xxx.log)
segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为 1g 如果大小大于 1g 时,会滚动一个新的 segment 并且以上一个 segment 最后一条消息的偏移量命名
Kafka 新建的分区会在哪个目录下创建
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区 ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。
rabbitMQ
架构图
架构
消息模型
少数依然坚持使用队列模型的产品之一。
Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。
组件
交换器
主要有以下4种。
①fanout:所有bind到此exchange的queue都可以接收消息(纯广播,绑定到RabbitMQ的接受者都能收到消息);
②direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息;
③topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息;
匹配规则:
RoutingKey 为一个 点号'.': 分隔的字符串。比如: java.xiaoka.show
BindingKey和RoutingKey一样也是点号“.“分隔的字符串。
BindingKey可使用 * 和 # 用于做模糊匹配,*匹配一个单词,#匹配多个或者0个
headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到。
vhost
vhost可以理解为虚拟broker ,即mini-RabbitMQ server。其内部均含有独立的 queue、exchange和binding等,
其拥有独立的权限系统,可以做到 vhost 范围的用户控制。
vhost可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的vhost中)。
消息路由
AMQP 中的消息路由
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
AMQP 的消息路由过程
Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
direct
原理图
direct 交换器
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
fanout
fanout 交换器
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic
topic 交换器
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
集群
集群模式
图
主备模式:实现rabbitMQ高可用集群,一般在并发量和数据不大的情况下,这种模式好用简单。又称warren模式。(区别于主从模式,主从模式主节点提供写操作,从节点提供读操作,主备模式从节点不提供任何读写操作,只做备份)如果主节点宕机备份从节点会自动切换成主节点,提供服务。
集群模式:经典方式就是Mirror模式,保证100%数据不丢失,实现起来也是比较简单。
镜像队列,是rabbitMQ数据高可用的解决方案,主要是实现数据同步,一般来说是由2-3节点实现数据同步,(对于100%消息可靠性解决方案一般是3个节点)
延迟消息队列 两种方式
通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。
文章链接
RabbitMQ 总结
rocketMQ
架构图
tu
消息模型
图
RocketMQ 使用的消息模型是标准的发布 - 订阅模型
每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。
RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
订阅者的概念是通过消费组(Consumer Group)来体现的。
每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
组成部分
生产者:不会保存消息发送状态,本身无状态,可以scale out扩展
消费者:
1)集群消费模式下会保存消费进度,并定期更新到broker,本地不存储,重启时到broker上获取最新消费进度,相
当于无状态,可以集群部署。
2)广播消费模式下,每个consumer消费进度保存到本地文件系统,启动时从本地加载消费进度,但是各个
consumer之间消费进度互不影响,相当于share nothing,故也可以扩展,但是不同的consumer可能会重复消费。
Name sever:
1)各个name server之间没有通信,一个宕机不影响其他,可以集群部署。
2)Name server的扩容可以通过单独的http服务器来维护所有namesrv地址列表,让producer和consumer以及broker
集群动态感知最新的namesrc上下线。(http://jmenv.tbsite.net:8080/rocketmq/nsaddr) 但是这样增加了系统的复杂度和运维负担。
3)个人认为一个更好的方案是通过zk来动态扩容(类似于dubbo的服务发现)
Broker:
1)每个broker保存topic的部分信息,彼此之间无共享,可以通过部署多台broker来分担负载。
2)broker扩容通过name server来实现,新加入的broker会把自己注册到name server,这样producer和consumer就可
以从name server获取到最新的broker信息
3)集群方案:单master方案有单点;多master当某个master宕机会对写没有影响,但是该master上的消息再恢复前
不能读,影响消息实时性;多master多slave方式,异步复制下会有丢消息可能,同步复制下不会丢消息但性能略低。
工作流程
启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
Broker启动,跟所有的Namesrv保持***长连接***,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
消费模式
集群消费 CLUSTERING
一条消息只会被同Group中的一个Consumer消费
多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
默认是 CLUSTERING 模式,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。
广播消费BROADCASTING
消息将对一个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
消息重复消费
出现原因
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除
当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer。
消费模式:在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次
解决方案
数据库表:处理消息前,使用消息主键在表中带有约束的字段中insert
Map:单机时可以使用map做限制,消费时查询当前消息id是不是已经存在
Redis:使用分布式锁。
顺序消费
首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。
可以使用同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
消息丢失
Producer端
采取send()同步发消息,发送结果是同步感知的。
发送失败后可以重试,设置重试次数。默认3次。
Broker端
修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
集群部署
Consumer端
完全消费正常后在进行手动ack确认
分布式事务
1、生产者向MQ服务器发送half消息。
2、half消息发送成功后,MQ服务器返回确认消息给生产者。
3、生产者开始执行本地事务。
4、根据本地事务执行的结果(UNKNOW、commit、rollback)向MQ Server发送提交或回滚消息。
5、如果错过了(可能因为网络异常、生产者突然宕机等导致的异常情况)提交/回滚消息,则MQ服务器将向同一组中的每个生产者发送回查消息以获取事务状态。
6、回查生产者本地事物状态。
7、生产者根据本地事务状态发送提交/回滚消息。
8、MQ服务器将丢弃回滚的消息,但已提交(进行过二次确认的half消息)的消息将投递给消费者进行消费。
Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。
超时:如果超过回查次数,默认回滚消息。
也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。
消息堆积
1、如果可以添加消费者解决,就添加消费者的数据量
2、如果出现了queue,但是消费者多的情况。可以使用准备一个临时的topic,同时创建一些queue,在临时创建一个消费者来把这些消息转移到topic中,让消费者消费。