导图社区 ActiveMQ思维导图
这是一篇关于ActiveMQ的思维导图。ActiveMQ全助记,囊括基本概念、部署、JMS 标准、传输协议、持久化、高级特性等。
编辑于2021-08-26 23:50:02ActiveMQ
特性 ActiveMQ RabbitMQ Kafka RocketMQ PRODUCER-CUMSUMER 支持 支持 支持 支持 PUBLISH-SUBSCRIBE 支持 支持 支持 支持 REQUEST-REPLY 支持 支持 - 支持 API完备性 高 高 高 低(静态配置) 多语言支持 支持,Java优先 语言无关 支持,Java优先 支持 单机吞吐量 万级 万级 十万级 单机万级 消息延迟 - 微秒级 毫秒级 - 可用性 高(主从) 高(主从) 非常高(分布式) 高 消息丢失 - 低 理论上不会丢失 - 消息重复 - 可控制 理论上会有重复 - 文档的完备性 高 高 高 中 提供快速入门 有 有 有 无 首次部署难度 - 低 中 高 特性: 事务、持久化、签收
Concept
作用
1. 异步,调用者无需等待 2. 解耦,消除系统之间耦合调用 3. 消峰,抵御高并发流量,保护主业务 弹性伸缩,冗余存储,数据同步
特点
1. 异步 2. 解耦
缺点
Producer 和 Consumer 不能同步实时调用
Broker
ActiveMQ 服务器实例
Install
解压直接启动
命令
# 启动 bin/activemq start # 停止 bin/activemq stop # 重启 bin/activemq restart
WebConsole
---本机以外访问 WebConsole--- # 修改 jetty.xml 中 host 127.0.0.1 -> hostname/ip <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="docker"/> <property name="port" value="8161"/> </bean> WebConsole 地址:http://hostname:8161 user/password: admin/admin
Queues
Number Of Pending Messages: 等待消费的消息数 = 总接收数 - 总出队列数 Number Of Consumers: 消费者数量 Messages Enqueued: 进队消息数,包括已出队列的,只增不减 Messages Dequeued: 出兑消息数,消费者消费掉的消息数
查看状态
# 查看进程 ps -ef | grep activemq # 查看端口号 netstat -anp | grep 61616 # 查看端口号 # yum install lsof lsof -i:61616
多节点集群
基于 shareFileSystem 共享文件系统(KahaDB)
基于 JDBC
基于可复制的 LevelDB
Zookeeper+Replicated LevelDB 主从集群
JMS
Java Message Service  ---JavaEE--- JavaEE是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准。 JavaEE平台提供了一个基于组件的方法来加快设计,开发。装配及部署企业应用程序。 1,JDBC(Java Databease)数据库连接 2,JNDI(Java Naming and Directory Interfaces)Java的命令和目录接口 3,EJB(Enterprise JavaBean) 4,RMI(Remote Method Invoke)远程方法调用 5,Java IDL(Interface Description Language)/CORBA(Common Object Broker Architecture)接口定义语言/共用对象请求代理程序体系结构 6,JSP(Java Server Page) 7,Servlet 8,XML(Extensible Markup Language)可标记白标记语言 9,JMS(Java Message Service)Java消息服务 10,JTA(Java Transaction API)Java事务API 11,JTS(Java Transaction Service)Java事务服务 12,JavaMail 13,JAF(JavaBean Activation Framework)
JMS Provider
实现 JMS 接口和规范的消息中间件,MQ 服务器
JMS Producer
消息生产者,创建和发送 JMS 消息的客户端应用
JMS Consumer
消息消费者,接收和处理 JMS 消息的客户端应用
JMS Message
Header
JMS Destination
消息发送的目的地,主要指 Queue 和 Topic Sender -> Queue -> Receiver Publisher -> Topic -> Subscriber
Queue
---特点--- 1. 1 对 1,每个 Message(非 Queue)只能有一个 Consumer 2. Producer 与 Consumer 时间上无关,后来的 Consumer 可以消费之前进队的消息 3. 消息只能消费一次,被消费后不会再被存储 4. 多个消费者消费一个 Queue 的 消息,默认平摊? ---两种消费方式--- 1. 同步阻塞:receive,在能接收到消息前一直阻塞 2. 异步非阻塞:MessageListener::onMessage,当消息到达时回调
Topic
---特点--- 1. 1 对 N,每个 Message 可以有多个 Consumer 2. Producer 与 Consumer 默认时间上相关,Consumer 只能消费订阅之后发布的 Message 3. Topic 默认不保证废消息(无人订阅的消息)的不落地,需自行保证先启动订阅者,在启动发布者 JMS 允许客户穿件持久订阅,即允许消费者消费到之前发布的消息
Queue vs. Topic
1. 工作模式: Queue 属于 "负载均衡" 模式;Topic 属于 "发布/订阅" 模式 2. 有无状态: Queue 默认将数据保存至文件,data/kahadb,可以配置至 Database;Topic 无状态 3. 传递完整性: Queue 消息即时当前无消费者,消息不会丢弃;Topic 如果没有订阅者,消息会被丢弃 4. 处理效率: Queue 的消费因为只被消费一次,几乎不受消费者多少影响;Topic 的消息被所有订阅者复制消费,性能随着订阅者增加而明显下降,且受消息协议自身影响
JMS Delivery Mode
持久模式。 持久性的消息:传递一次且仅一次,即使 JMS 服务器出现故障,该消息也不会丢失,在服务器恢复之后可继续传递 非持久的消息:最多传递一次,服务器出现故障时,该消息将永远丢失
JMS Expiration
设置消息的过期时间,如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除 默认永不过期,等于 Destination 的 send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值 如果 timeToLive 值等于0,则 JMSExpiration 被设为0,表示该消息永不过期
JMS Priority
消息优先级,默认是 4 级 从 0-9 十个级别,0-4 是普通消息 5-9 是加急消息 JMS 不要求 MQ 严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。
JMS Message ID
每条消息的唯一标识
Content
TxtMessage
String,普通字符串消息
MapMessage
Map,key 为 String,value 为 Java 基本类型
BytesMessage
byte[]
StreamMessage
数据流,使用标准流操作填充和读取数据
ObjectMessage
可序列化的 Java 对象
Property
消息以外的属性,如用于识别、去重、重点标记等操作的标记
Persistent
持久性 非持久:服务器宕机,消息不存在 持久化:服务器宕机,消息依然存在
Queue
默认是持久
Topic
默认非持久化,此时消费者只能消费自己上线后发布的消息 持久化:主要消费者上线,就可以消费所有发布成功的消息,不管是 MQ 宕机还是上线前的消息 先启动订阅者,再启动生产者
Transaction
事务,偏生产者 生产者开启事务,只有执行 commit 才会真正提交消息,rollback 会回滚消息 消费者开启事务,只有执行 commit 才会真正标记消息已消费,rollback 可违反消息只能被消费一次的规则
Producer
false
关闭事务,只执行 send,消息就会进入队列 签收参数的设置需要有效
true
启用事务,需要先执行 send 再执行 commit,消息才会真正被提交到队列,此时签收机制不再重要 消息会批量提交,缓存处理
Acknowledge
签收,偏消费者 AUTO_ACKNOWLEDGE - 自动签收,默认方式 CLIENT_ACKNOWLEDGE - 手动签收,手动调用 Message.acknowledge() DUPS_OK_ACKNOWLEDGE - 允许重复签收,多线程或多个消费者因为线程不安全,可能会重复消费 SESSION_TRANSACTED
Spring
Transport Protocol
TCP(Transmission Control Protocol),默认
端口:61616 优点: 1. 传输可靠性搞,稳定性强 2. 序列化为字节流传递,效率较高 3. 应用广泛,支持任意平台
NIO(New I/O API Protocol)
网络延迟环境下,比 TCP 性能好 更多的 Client 调用和服务端负载,解放操作系统线程限制,消耗更少的线程
配置
# 添加 nio 连接器,在 activemq.xml 中添加 <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
应用到所有协议
# 在 activemq.xml 中增加 <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/> - auto: 针对所有协议,自动识别使用的协议 - nio: 使用 NIO 网络 IO 模型
AMQP(Advanced Message Queuing Protocol)
一种统一消息服务的应用层通信协议,基于此协议客户端和消息中间件的消息传递不再受不同产品,不同语言等条件限制
STOMP(Streaming Text Orientated Message protocol)
流文本定向消息协议,简单文本协议
SSL(Secure Sockets Layer Protocol)
MQTT(Message Queuing Telemetry Transport)
消息队列遥测传输,一种即时通信协议,物联网中传感器和致动器的通信协议
WS(WebSocket)
Persistent
---Queue--- 当 DeliveryMode 设置为 non_persistence 时,消息保存在内存中 当 DeliveryMode 设置为 persistence 时,消息保存在相应文件或数据库中,消息一旦被消费,消息会被删除 ---Topic--- 当 DeliveryMode 设置为 persistence 时,订阅者(avtivemq_acks)和消息(activemq_msgs)会被永久保存,通过 avtivemq_acks 中订阅者最后消费的消息ID(last_ack_id)可以知道消费者消费到的位置 ---总结--- 1. jdbc 效率低,KahaDB 效率高,jdbc+journal 效率高 2. 持久化主要目的:防止 MQ 宕机时消息丢失
AMQ Message Store
基于文件的存储机制,写入速度快,容易恢复; 5.3 之前的默认机制,现已不再使用 文件默认大小 32MB,一个文件的消息被全部消费时被标记为可删除
KahaDB(默认)
5.4 之后默认存储方式,提高了性能和恢复能力 使用一个事务日志和一个全局索引文件存储
原理
1. db-1.log,存储消息,当文件达到预定大小,会创建新的文件 db-2.log,预定大小默认是 32MB,当文件中的数据不再被引用时,文件会被删除或归档 2. db.data,持久化的 BTree,索引 db-1.log 中存储的消息 3. db.free,记录 db.data 中空闲页的 ID 4. db.redo,用于重做 BTree 索引,恢复数据 5. lock,文件锁,记录当前获得 KahaDB 读写权限的 Broker
JDBC
MySQL 案例
1. 添加 mysql 驱动至 lib 2. 修改 activemq.xml 配置项 <persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kdhadb"/> --> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/> <!-- dataSource 指向数据源定义 bean 的名称 #mysql-ds --> <!-- createTableOnStartup 指定是否在启动时创建相关表,只在第一启动时设置为 true --> </persistenceAdapter> 3. 配置数据源定义 bean,插入 </broker> 与 <import> 标签之间 <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://mysql数据库URL/activemq?relaxAutoCommit=true"/> <property name="username" value="mysql数据库用户名"/> <property name="password" value="mysql数据库密码"/> <property name="poolPreparedStatements" value="true"/> </bean> 4. 建库,重启 ActiveMQ,自动生成数据库表
ACTIVEMQ_MSGS
消息表,Queue 和 Topic 都存在里面,结构如下: id: bigint(20) 自增数据库主键 container: varchar(250) 消息的 destination msgid_prod: varhcar(250) 消息发送者的主键 msgid_seq: bigint(20) 消息发送的顺序,msgid_prod + msgid_seq 组合生成 JMS 的 MessageID expiration: bigint(20) 消息过期时间,时间戳 msg: blob 消息体,Java 对象序列化的二进制数据 priority: bigint(20) 优先级,0-9,数值越大优先级越高
ACTIVEMQ_ACKS
用于存储订阅关系,如果时持久化 Topic,订阅者和服务器的关系在此保存,结构如下: container: varchar(250) 消息的 destination sub_dest: varchar(250) 如果使用 static 集群,这里存储集群其他系统信息 client_id: varchar(250) 订阅者的唯一ID sub_name: varchar(250) 订阅者名称 selector: varchar(250) 选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现 priority: bigint(20) default 5 优先级 last_acke_id bigint 记录消费过的消息的ID
ACTIVEMQ_LOCK
记录当前作为 Master 的 Broker 在集群环境下使用,集群中只有一个 Broker 可以获取消息,即 Master Broker 其他 Broker 只作为备份和备机,Master Broker 不可用时,成为下一个 Master Broker id: bigint(20) 自增主键 time: bigint(20) broker_name: varchar(250)
LevelDB
ActiveMQ 5.8 之后引进,和 KahaDB 相似,提供比 KahaDB 更快的持久性,也是基于文件的本地数据库存储形式,但不使用 BTree 索引,而是使用 LevelDB 但索引 5.9 提供 Zookeeper + LevelDB 集群化方案
JDBC Message Store with ActiveMQ Journal
高速缓存写入技术,克服 JDBC Store 写库读库的不足 当消费者处理消息速度较快时,直接从 Journal 消费掉消息,不用写入数据库 当消费者处理消息速度较慢时,Journal 以批处理的方式将数据写入数据库,待消费者消费
Advanced Feature
使用场景
配置生成环境的连接协议
ActiveMQ 持久化机制
引入消息中间件后如何保证高可用
异步投递
延迟投递和定时投递
分发策略
消息重试机制
死信队列
如何保证消息不被重复消费,幂等性问题