导图社区 RabbitMQ
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用
编辑于2022-02-24 19:57:20Linux是一套不用付费使用和自由传播的类Unix操作系统,是一个基于POSIX和UNIX的多用户、多任务、支持多线程和多CPU的操作系统。专业性强,主要用于服务器,嵌入式系统,移 动终端系统等
Thymeleaf提供了一个用于整合Spring MVC的可选模块,在应用开发中,你可以使用Thymeleaf来完全代替JSP或其他模板引擎,如Velocity、FreeMarker等。Thymeleaf的主要目标在于提供一种可被浏览器正确显示的、格式良好的模板创建方式,因此也可以用作静态建模。你可以使用它创建经过验证的XML与HTML模板。相对于编写逻辑或代码,开发者只需将标签属性添加到模板中即可。
这是一篇关于Nginx的思维导图,主要内容有1.目标、2.Nginx的安装与启动、3.Nginx静态网站部署、4.Nginx反向代理与负载均衡。
社区模板帮助中心,点此进入>>
Linux是一套不用付费使用和自由传播的类Unix操作系统,是一个基于POSIX和UNIX的多用户、多任务、支持多线程和多CPU的操作系统。专业性强,主要用于服务器,嵌入式系统,移 动终端系统等
Thymeleaf提供了一个用于整合Spring MVC的可选模块,在应用开发中,你可以使用Thymeleaf来完全代替JSP或其他模板引擎,如Velocity、FreeMarker等。Thymeleaf的主要目标在于提供一种可被浏览器正确显示的、格式良好的模板创建方式,因此也可以用作静态建模。你可以使用它创建经过验证的XML与HTML模板。相对于编写逻辑或代码,开发者只需将标签属性添加到模板中即可。
这是一篇关于Nginx的思维导图,主要内容有1.目标、2.Nginx的安装与启动、3.Nginx静态网站部署、4.Nginx反向代理与负载均衡。
RabbitMQ
0. 学习目标
能够说出什么是消息队列
能够安装RabbitMQ
能够编写RabbitMQ的入门程序
能够说出RabbitMQ的5种模式特
能够使用SpringBoot整合RabbitMQ
项目地址
1. 消息队列概述
1.1. 消息队列MQ
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法
为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景:
1、任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
1.2. AMQP 和 JMS
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
1.2.1. AMQP
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
1.2.2. JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
1.2.3. AMQP 与 JMS 区别
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式;而AMQP的消息模式更加丰富
1.3. 消息队列产品
市场上常见的消息队列有如下:
ActiveMQ:基于JMS
ZeroMQ:基于C语言开发
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ:基于JMS,阿里巴巴产品
Kafka:类似MQ的产品;分布式消息系统,高吞吐量
1.4. RabbitMQ
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;不作介绍);
官网对应模式介绍
2. 安装及配置RabbitMQ
目标
按照文档在本机安装windows版本RabbitMQ,并配置其用户和Virtual Hosts
分析
1. 安装erlang;
2. 安装rabbitMQ;
3. 安装RabbitMQ的图形管理界面插件;
4. 创建管理用户;
5. 创建虚拟主机Virtual Hosts
小结
安装上述的组件时候都需要使用以管理员身份运行。
2.1. 安装说明
1. 说明
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(OpenTelecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配。
2. 安装erlang
下载地址
以管理员身份运行此文件进行安装
找到 资料\软件\otp_win64_20.3.exe 右击选择 以管理员身份运行
默认安装
erlang安装完成需要配置erlang 系统环境变量: ERLANG_HOME=C:\Program Files\erl9.3 在path中添加%ERLANG_HOME%\bin;
3. 安装rabbitMQ
下载地址
以管理员身份运行此文件进行安装。安装完成后可以在系统服务中查看到RabbitMQ服务。
找到 资料\软件\rabbitmq-server-3.7.14.exe 右击选择 以管理员身份运行 。
默认安装
4. 配置插件
为了更加方便的管理RabbitMQ服务,可以安装RabbitMQ提供的一个浏览器端管理插件,可以通过浏览器页面方便的进行服务管理。
安装方式:
1、以管理员身份打开 cmd (不是PowerShell);然后进入在RabbitMQ的安装目录下sbin目录
2、在上述窗口执行命令:
rabbitmq-plugins.bat enable rabbitmq_management
3、打开浏览器访问网站http://localhost:15672进入登录页面,默认账号和密码都为guest
http://localhost:15672
guest
注意
1、安装erlang和RabbitMQ时都需要以管理员身份进行安装
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang,方法为搜索RabbitMQ、ErlSrv,将对应的项全部删除。
2.2. 用户以及Virtual Hosts配置
2.2.1. 用户角色
RabbitMQ在安装好后,可以访问http://localhost:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
http://localhost:15672
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者
2.2.2. Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
1. 创建Virtual Hosts
2. 设置Virtual Hosts权限
3. RabbitMQ入门
3.1. 搭建示例工程
目标
搭建RabbitMQ入门工程并配置对应的maven依赖
分析
创建wry-rabbitmq的工程;用于测试RabbitMQ的消息收发。添加用于操作RabbitMQ的依赖。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
小结
使用IDEA创建maven工程;使用了jdk1.8。在工程中的pom.xml文件中添加了上述的依赖。
3.1.1. 创建工程
创建父工程
创建生产者模块
3.1.2. 添加依赖
的pom.xml文件中添加如下依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
3.2. 编写生产者
目标
:编写消息生产者代码,发送消息到队列
分析
入门工程:生产者发送消息到RabbitMQ的队列(simple_queue);消费者可以从队列中获取消息。可以使用RabbitMQ的简单模式(simple)。
生产者实现发送消息的步骤:
1. 创建连接工厂(设置RabbitMQ的连接参数);
2. 创建连接;
3. 创建频道;
4. 声明队列;
5. 发送消息;
6. 关闭资源
小结
在设置连接工厂的时候;如果没有指定连接的参数则会有默认值;可以去设置虚拟主机。
package com.wry.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author wry * @date 2022/2/7 * @apiNote 简单模式:发送消息 */ public class Producer { static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 1. 创建连接工厂(设置RabbitMQ的连接参数); ConnectionFactory connectionFactory = new ConnectionFactory(); /* // 主机:默认localhost connectionFactory.setHost("localhost"); // 连接端口: 默认5672 connectionFactory.setPort(15672); // 虚拟主机: 默认/ connectionFactory.setVirtualHost("/wryVirtualHost"); // 用户名: 默认guest connectionFactory.setUsername("wry"); // 密码: 默认guest connectionFactory.setPassword("wry");*/ // 简洁 factory.setUri("amqp://username:password@hostName:portNumber/virtualHost"); connectionFactory.setUri("amqp://wry:wry@localhost:15672/wryVirtualHost"); // 2. 创建连接; Connection connection = connectionFactory.newConnection(); // 3. 创建频道; Channel channel = connection.createChannel(); // 4. 声明队列; channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 5. 发送消息; String message = "你好,RabbitMq!--wry"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("已发送消息:"+message); // 6. 关闭资源; connection.close(); } }
编写消息生产者com.wry.rabbitmq.simple.Producer
package com.wry.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author wry * @date 2022/2/7 * @apiNote 简单模式:发送消息 */ public class Producer { static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 1. 创建连接工厂(设置RabbitMQ的连接参数); ConnectionFactory connectionFactory = new ConnectionFactory(); /* // 主机:默认localhost connectionFactory.setHost("localhost"); // 连接端口: 默认5672 connectionFactory.setPort(15672); // 虚拟主机: 默认/ connectionFactory.setVirtualHost("/wryVirtualHost"); // 用户名: 默认guest connectionFactory.setUsername("wry"); // 密码: 默认guest connectionFactory.setPassword("wry");*/ // 简洁 factory.setUri("amqp://username:password@hostName:portNumber/virtualHost"); connectionFactory.setUri("amqp://wry:wry@localhost:15672/wryVirtualHost"); // 2. 创建连接; Connection connection = connectionFactory.newConnection(); // 3. 创建频道; Channel channel = connection.createChannel(); // 4. 声明队列; channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 5. 发送消息; String message = "你好,RabbitMq!--wry"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("已发送消息:"+message); // 6. 关闭资源; connection.close(); } }
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
3.3. 编写消费者
目标
编写消息消费者代码,从队列中接收消息并消费
分析
从RabbitMQ中队列(与生产者发送消息时的队列一致;simple_queue)接收消息;
实现消费者步骤:
1. 创建连接工厂;
2. 创建连接;(抽取一个获取连接的工具类)
3. 创建频道;
4. 声明队列;
5. 创建消费者(接收消息并处理消息);
6. 监听队列
小结
需要持续监听队列消息,所以不要关闭资源
抽取创建connection的工具类com.wry.rabbitmq.util.ConnectionUtil;
package com.wry.rabbitmq.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; /** * @author wry * @date 2022/2/21 * @apiNote */ public class ConnectionUtil { /** * 创建连接 * @return 连接 * @throws Exception */ public static Connection getConnection() throws Exception { // 1. 创建连接工厂(设置RabbitMQ的连接参数); ConnectionFactory connectionFactory = new ConnectionFactory(); /* // 主机:默认localhost connectionFactory.setHost("localhost"); // 连接端口: 默认5672 connectionFactory.setPort(15672); // 虚拟主机: 默认/ connectionFactory.setVirtualHost("/wryVirtualHost"); // 用户名: 默认guest connectionFactory.setUsername("wry"); // 密码: 默认guest connectionFactory.setPassword("wry");*/ // 简洁 factory.setUri("amqp://username:password@hostName:portNumber/virtualHost"); connectionFactory.setUri("amqp://wry:wry@localhost:15672/wryVirtualHost"); // 2. 创建连接; return connectionFactory.newConnection(); } }
编写消息的消费者com.wry.rabbitmq.simple.Consumer
package com.wry.rabbitmq.simple; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/21 * @apiNote 简单模式:消费者接受消息 */ public class Consumer { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; Channel channel = connection.createChannel(); // 声明队列; channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link com.rabbitmq.client.Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("接收到的消息:" + new String(body,"utf-8")); } }; // 监听队列 channel.basicConsume(Producer.QUEUE_NAME,true, defaultConsumer); } }
测试
目标
启动消费者和生产者,到RabbitMQ中查询队列并在消费者端IDEA控制台查看接收到的消息
分析
生产者:发送消息到RabbitMQ队列(simple_queue)
消费者:接收RabbitMQ队列消息
小结
简单模式:生产者发送消息到队列中,一个消费者从队列中接收消息。
在RabbitMQ中消费者只能从队列接收消息。
如果接收消息的消费者在同一个队列中有两个或多个时;消息是如何分配的?
启动provider
启动cunsumer
3.4. 小结
简单模式:生产者发送消息到队列中,一个消费者从队列中接收消息。
在RabbitMQ中消费者只能从队列接收消息。
如果接收消息的消费者在同一个队列中有两个或多个时;消息是如何分配的?
上述的入门案例中中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
在rabbitMQ中消息者是一定要到某个消息队列中去获取消息的
4. RabbitMQ工作模式
4.1. Work queues工作队列模式
目标
编写生产者、消费者代码并测试了解Work queues工作队列模式的特点
分析
工作队列模式:在同一个队列中可以有多个消费者,消费者之间对于消息的接收是竞争关系。
生产者:发送30个消息
消费者:创建两个消费者监听同一个队列,查看两个消费者的接收消息是否存在重复。
小结
工作队列模式:一个消息只能被一个消费者接收,其它消费者是不能接收到同一条消息的。
用场景:可以在消费者端处理任务比较耗时的时候;添加对同一个队列的消费者来提高任务处理能力。
4.1.1. 模式说明
Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
4.1.2. 代码
Work Queues 与入门程序的 简单模式 的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
1)生产者
package com.wry.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.wry.rabbitmq.util.ConnectionUtil; /** * @author wry * @date 2022/2/22 * @apiNote 工作队列模式:生产者发送消息 */ public class Producer { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 3. 创建频道; Channel channel = connection.createChannel(); // 4. 声明队列; channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 循环 for (int i = 0; i < 30; i++) { // 5. 发送消息; String message = "你好,RabbitMq!--wry" + i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("已发送消息:"+message); } // 6. 关闭资源; connection.close(); } }
2)消费者1
package com.wry.rabbitmq.work; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 工作队列模式:消费者1接受消息 */ public class Consumer1 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明队列; channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); // 每次可以预取多少个消息 channel.basicQos(1); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link com.rabbitmq.client.Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者1--接收到的消息:" + new String(body,"utf-8")); // 延迟1秒 Thread.sleep(1000); // 确认消息 channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 监听队列 channel.basicConsume(Producer.QUEUE_NAME,true, defaultConsumer); } }
3)消费者2
package com.wry.rabbitmq.work; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 工作队列模式:消费者2接受消息 */ public class Consumer2 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明队列; channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); // 每次可以预取多少个消息 channel.basicQos(1); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者2--接收到的消息:" + new String(body,"utf-8")); // 延迟1秒 Thread.sleep(1000); // 确认消息 channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 监听队列 channel.basicConsume(Producer.QUEUE_NAME,true, defaultConsumer); } }
4.1.3. 测试
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
4.1.4. 小结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
4.2. 订阅模式类型
目标
说出订阅模式中的Exchange交换机作用以及交换机的三种类型
小结
订阅模式与前面的两种模式比较:多了一个角色Exchange交换机,接收生产者发送的消息并决定如何投递消息到其绑定的队列;消息的投递决定于交换机的类型。
交换机类型:广播(fanout)、定向(direct)、通配符(topic)
交换机只做消息转发,自身不存储数据。
订阅模式示例图:
前面2个案例中,只有3个角色:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
4.3. Publish/Subscribe发布与订阅模式
目标
编写生产者、消费者代码并测试了解Publish/Subscribe发布与订阅模式的特点
分析
发布与订阅模式特点:一个消息可以被多个消费者接收;其实是使用了订阅模式,交换机类型为:fanout广播
生产者(发送10个消息)
1. 创建连接;
2. 创建频道;
3. 声明交换机(fanout);
4. 声明队列;
5. 队列绑定到交换机;
6. 发送消息;
7. 关闭资源
消费者(至少两个消费者)
1. 创建连接;
2. 创建频道;
3. 声明交换机;
4. 声明队列;
5. 队列绑定到交换机;
6. 创建消费者;
7. 监听队列;
小结
发布与订阅模式:**一个消息可以被多个消费者接收**;一个消费者对于的队列,该队列只能被一个消费者监听。使用了订阅模式中交换机类型为:广播。
4.3.1. 模式说明
发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
4.3.2. 代码
1)生产者
package com.wry.rabbitmq.ps; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wry.rabbitmq.util.ConnectionUtil; /** * @author wry * @date 2022/2/22 * @apiNote 发布与订阅模式:生产者发送消息 */ public class Producer { // 交换机名称 static final String FANOUT_EXCHANGE = "fanout_exchange"; // 队列名称 static final String FANOUT_QUEUE_1 = "fanout_queue_1"; static final String FANOUT_QUEUE_2 = "fanout_queue_2"; public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 2. 创建频道; Channel channel = connection.createChannel(); // 3.声明交换机 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 4. 声明队列; channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null); channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null); // 5.队列绑定到交换机 channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,""); channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,""); // 循环 for (int i = 0; i < 10; i++) { // 6. 发送消息; String message = "你好,RabbitMq!--wry" + i; channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes()); System.out.println("已发送消息:"+message); } // 7. 关闭资源; channel.close(); connection.close(); } }
2)消费者1
package com.wry.rabbitmq.ps; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 发布与订阅模式:消费者1接受消息 */ public class Consumer1 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 声明队列; channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null); // 队列绑定到交换机 channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,""); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者1--接收到的消息:" + new String(body,"utf-8")); } }; // 监听队列 channel.basicConsume(Producer.FANOUT_QUEUE_1,true, defaultConsumer); } }
3)消费者2
package com.wry.rabbitmq.ps; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 发布与订阅模式:消费者2接受消息 */ public class Consumer2 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 声明队列; channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null); // 队列绑定到交换机 channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,""); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者2--接收到的消息:" + new String(body,"utf-8")); } }; // 监听队列 channel.basicConsume(Producer.FANOUT_QUEUE_2,true, defaultConsumer); } }
4.3.3. 测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广 播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到 Exchanges 选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
4.3.4. 小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
4.4. Routing路由模式
目标
编写生产者、消费者代码并测试了解Routing路由模式的特点
分析
生产者:发送两条消息(路由key分别为:insert、update)
消费者:创建两个消费者,监听的队列分别绑定路由key为:insert、update
消息中路由key为insert的会被绑定路由key为insert的队列接收并被其监听的消费者接收、处理;
消息中路由key为update的会被绑定路由key为update的队列接收并被其监听的消费者接收、处理;
小结
Routing 路面模式要求队列绑定到交换机的时候指定路由key;消费发送时候需要携带路由key;只有消息的路由key与队列路由key完全一致才能让该队列接收到消息。
4.4.1. 模式说明
路由模式特点:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
图解:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
4.4.2. 代码
在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
1)生产者
package com.wry.rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wry.rabbitmq.util.ConnectionUtil; /** * @author wry * @date 2022/2/23 * @apiNote 路由模式:生产者发送消息 */ public class Producer { // 交换机名称 static final String DIRECT_EXCHANGE = "direct_exchange"; // 队列名称 static final String DIRECT_QUEUE_INSERT = "direct_queue_insert"; static final String DIRECT_QUEUE_UPDATE = "direct_queue_update"; public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 2. 创建频道; Channel channel = connection.createChannel(); // 3.声明交换机 channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); // 4. 声明队列; channel.queueDeclare(DIRECT_QUEUE_INSERT,true,false,false,null); channel.queueDeclare(DIRECT_QUEUE_UPDATE,true,false,false,null); // 5.队列绑定到交换机 channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHANGE,"insert"); channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHANGE,"update"); // 6. 发送消息; String message = "你好,RabbitMq!路由模式,routingKey为insert--wry"; channel.basicPublish(DIRECT_EXCHANGE,"insert",null,message.getBytes()); System.out.println("已发送消息:"+message); message = "你好,RabbitMq!路由模式,routingKey为update--wry"; channel.basicPublish(DIRECT_EXCHANGE,"update",null,message.getBytes()); System.out.println("已发送消息:"+message); // 7. 关闭资源; channel.close(); connection.close(); } }
2)消费者1
package com.wry.rabbitmq.routing; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 发布与订阅模式:消费者1接受消息 */ public class Consumer1 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明队列; channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT,true,false,false,null); // 队列绑定到交换机 channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHANGE,"insert"); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者1--接收到的消息:" + new String(body,"utf-8")); } }; // 监听队列 channel.basicConsume(Producer.DIRECT_QUEUE_INSERT,true, defaultConsumer); } }
3)消费者2
package com.wry.rabbitmq.routing; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 发布与订阅模式:消费者2接受消息 */ public class Consumer2 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明队列; channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE,true,false,false,null); // 队列绑定到交换机 channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHANGE,"update"); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者2--接收到的消息:" + new String(body,"utf-8")); } }; // 监听队列 channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE,true, defaultConsumer); } }
4.4.3. 测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到 Exchanges 选项卡,点击 direct_exchange 的交换机,可以查看到如下的绑定:
4.4.4. 小结
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
4.5. Topics通配符模式
目标
编写生产者、消费者代码并测试了解Topics通配符模式的特点
分析
生产者:发送包含有item.insert、item.update,item.delete的3中路由key消息
消费者1:监听的队列绑定到交换机的路由key为:item.update,item.delete
消费者2:监听的队列绑定到交换机的路由key为:item.*
小结
Topics通配符模式:可以根据路由key将消息传递到对应路由key的队列;队列绑定到交换机的路由key可以有多个;通配符模式中路由key可以使用 `*` 和 `#` ;使用了通配符模式之后对于路由Key的配置更加灵活。
4.5.1. 模式说明
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
# :匹配一个或多个词
* :匹配不多不少恰好1个词
举例:
item.# :能够匹配 item.insert.abc 或者 item.insert
item.* :只能匹配 item.insert
图解:
红色Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
黄色Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
4.5.2. 代码
1)生产者
使用topic类型的Exchange,发送消息的routing key有3种: item.insert 、 item.update 、 item.delete :
package com.wry.rabbitmq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.wry.rabbitmq.util.ConnectionUtil; /** * @author wry * @date 2022/2/23 * @apiNote 通配符模式:生产者发送消息 */ public class Producer { // 交换机名称 static final String TOPIC_EXCHANGE = "topic_exchange"; // 队列名称 static final String TOPIC_QUEUE_1 = "topic_queue_1"; static final String TOPIC_QUEUE_2 = "topic_queue_2"; public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); // 发送消息; String message = "你好,RabbitMq!通配符模式,routingKey为item.insert--wry"; channel.basicPublish(TOPIC_EXCHANGE,"item.insert",null,message.getBytes()); System.out.println("已发送消息:"+message); message = "你好,RabbitMq!通配符模式,routingKey为item.update--wry"; channel.basicPublish(TOPIC_EXCHANGE,"item.update",null,message.getBytes()); System.out.println("已发送消息:"+message); message = "你好,RabbitMq!通配符模式,routingKey为item.delete--wry"; channel.basicPublish(TOPIC_EXCHANGE,"item.delete",null,message.getBytes()); System.out.println("已发送消息:"+message); // 关闭资源; channel.close(); connection.close(); } }
2)消费者1
接收两种类型的消息:更新商品和删除商品
package com.wry.rabbitmq.topic; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 通配符模式:消费者1接受消息 */ public class Consumer1 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); // 声明队列; channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null); // 队列绑定到交换机 channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"item.update"); channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"item.delete"); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者1--接收到的消息:" + new String(body,"utf-8")); } }; // 监听队列 channel.basicConsume(Producer.TOPIC_QUEUE_1,true, defaultConsumer); } }
3)消费者2
接收所有类型的消息:新增商品,更新商品和删除商品。
package com.wry.rabbitmq.topic; import com.rabbitmq.client.*; import com.wry.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * @author wry * @date 2022/2/22 * @apiNote 通配符模式:消费者2接受消息 */ public class Consumer2 { public static void main(String[] args) throws Exception { // 创建连接工厂; // 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); // 创建频道; final Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); // 声明队列; channel.queueDeclare(Producer.TOPIC_QUEUE_2,true,false,false,null); // 队列绑定到交换机 channel.queueBind(Producer.TOPIC_QUEUE_2,Producer.TOPIC_EXCHANGE,"item.*"); // 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * No-op implementation of {@link Consumer#handleDelivery}. */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 路由key System.out.println("路由key:" + envelope.getRoutingKey()); // 交换机 System.out.println("交换机:" + envelope.getExchange()); // 消息id System.out.println("消息id:" + envelope.getDeliveryTag()); // 接收到的消息 System.out.println("消费者2--接收到的消息:" + new String(body,"utf-8")); } }; // 监听队列 channel.basicConsume(Producer.TOPIC_QUEUE_2,true, defaultConsumer); } }
4.5.3. 测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
在执行完测试代码后,其实到RabbitMQ的管理后台找到 Exchanges 选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:
4.5.4. 小结
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
4.6. 模式总结
目标
对比总结RabbitMQ的5种模式特征
小结
不直接Exchange交换机(默认交换机)
simple简单模式:一个生产者生产一个消息到一个队列被一个消费者接收
work工作队列模式:生产者发送消息到一个队列中,然后可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系
使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)
发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列
路由模式:使用了direct定向类型的交换机,消费会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息
通配符模式:使用了topic通配符类型的交换机,消费会携带路由key(*, #),交换机根据消息的路由key与队列的路由key进行对比,匹配的话那么该队列可以接收到消
1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5. Spring Boot整合RabbitMQ
目标
创建springboot-rabbitmq-producer工程用于生产消息;创建springboot-rabbitmq-consumer工程用于接收消息
Spring Boot提供了对于AMQP的整合;可以使用RabbitTemplate发送消息;可以使用@RabbitListener注解接收消息。
分析
Spring Boot提供了对于AMQP的整合;可以使用RabbitTemplate发送消息;可以使用@RabbitListener注解接收消息。
**生产者工程springboot-rabbitmq-producer**:发送消息
1. 创建工程;
2. 添加依赖(spring-boot-stater-amqp,spring-boot-starter-test);
3. 创建启动引导类;
4. 添加配置文件application.yml
**消费者工程springboot-rabbitmq-consumer**:接收消息
1. 创建工程;
2. 添加依赖(spring-boot-stater-amqp);
3. 创建启动引导类;
4. 添加配置文件application.yml
小结
可以使用插件自动生产Spring Boot工程的启动引导类Application.java和配置文件application.yml
5.1. 简介
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp尤其是在spring boot项 只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
1. application.yml文件配置RabbitMQ相关信息;
2. 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
3. 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
1. application.yml文件配置RabbitMQ相关信息
2. 创建消息处理类,用于接收队列中的消息并进行处理
5.2. 搭建生产者工程
目标
配置springboot-rabbitmq-producer工程的RabbitMQ,一个交换机、队列并绑定
分析
使用通配符模式:将队列绑定到交换机(topic)时需要指定路由key(item.#)
配置RabbitMQ的连接参数:主机、连接端口、虚拟主机、用户名、密码;
声明交换机、队列并将队列绑定到交换机,指定的路由key(item.#)
5.2.1. 创建工程
创建生产者工程springboot-rabbitmq-producer
5.2.2. 添加依赖
修改pom.xml文件内容为如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> </parent> <groupId>com.wry.rabbitmq</groupId> <artifactId>springboot-rabbitmq-producer</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
5.2.3. 启动类
package com.wry.rabbitmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author wry * @date 2022/2/23 * @apiNote 启动引导类 */ @SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class,args); } }
5.2.4. 配置RabbitMQ
1)配置文件
创建application.yml,内容如下:
spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: wryVirtualHost username: wry password: wry
2)绑定交换机和队列
创建RabbitMQ队列与交换机绑定的配置类
package com.wry.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wry * @date 2022/2/24 * @apiNote */ @Configuration public class RabbitMQConfig { // 交换机名称 public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange"; // 队列名称 public static final String ITEM_QUEUE = "item_queue"; // 声明交换机 @Bean("itemTopicExchange") public Exchange topicExchange() { return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build(); } // 声明队列 @Bean("itemQueue") public Queue itemQueue() { return QueueBuilder.durable(ITEM_QUEUE).build(); } // 将队列绑定到交换机 @Bean public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,@Qualifier("itemTopicExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); } }
5.3. 搭建消费者工程
目标
配置springboot-rabbitmq-consumer工程的RabbitMQ,编写消息监听器接收消息
分析
1. 配置application.yml文件,设置RabbitMQ的连接参数;
2. 编写消息监听器接收队列(item_queue)消息;可以使用注解@RabbitListener接收队列消息
小结
- 配置application.yml文件;与生产者工程一致
- 编写监听器类
> 接收消息的队列名称要与生产者发送消息时的队列名称一致
5.3.1. 创建工程
创建消费者工程springboot-rabbitmq-consumer
5.3.2. 添加依赖
修改pom.xml文件内容为如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> </parent> <groupId>com.wry.rabbitmq</groupId> <artifactId>springboot-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
5.3.3. 启动类
package com.wry.rabbitmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
5.3.4. 配置RabbitMQ
创建application.yml,内容如下:
spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: wryVirtualHost username: wry password: wry
5.3.5. 消息监听处理类
编写消息监听器
package com.wry.rabbitmq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author wry * @date 2022/2/24 * @apiNote */ @Component public class MyListener { /** * 接受队列消息 * @param message 队列消息 */ @RabbitListener(queues = "item_queue") public void myListener1(String message) { System.out.println("消费者接收到的消息:" + message); } }
5.4. 测试
目标
生产者编写测试类RabbitMQTest发送消息到交换机和特定的路由(item.insert,item.update,item.delete)
分析
生产者:编写测试类RabbitMQTest,利用RabbitTemplate发送3条消息,这3条消息的路由key分别是item.insert,item.update,item.delete
消费者:在IDEA控制台查看是否能接收到符合路由key的消息
小结
先启动测试类进行声明交换机、队列和绑定;之后再启动消费者工程接收消息。
在生产者工程springboot-rabbitmq-producer中创建测试类,发送消息:
package com.wry.rabbitmq; import com.wry.rabbitmq.config.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @author wry * @date 2022/2/24 * @apiNote */ @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test() { rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert","商品,路由key为item.insert"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update","商品,路由key为item.update"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete","商品,路由key为item.delete"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "a.item.delete","商品,路由key为item.delete"); System.out.println("生产者发送消息"); } }
先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者;在消费者工程springboot-rabbitmqconsumer中控制台查看是否接收到对应消息。
另外;也可以在RabbitMQ的管理控制台中查看到交换机与队列的绑定:
测试
启动测试程序
启动消费者工程的启动类