导图社区 SpringCloud-Stream
这是一篇关于SpringCloud-Stream的思维导图,主要内容有作用、整体结构、元素详解、常用配置。
编辑于2022-07-03 17:07:12SpringCloud-Stream
Spring Cloud会自动实现该Binding的实现,也会提供Binding接口的实现,并注册到bean容器中。即可以在程序中自动注入Source类型的bean,也可以注入MessageChannel类型的bean 默认在接收和发送消息时对应的消息格式类型都是JSON格式
作用
用于构建消息驱动的微服务应用程序的框架
整体结构
组件详解
inputs消费端
默认通道
Sink.java
使用方式
自定义通道
outputs生产端
通道
默认通道
Source.java
自定义通道
如图
代码引入方式
注入接口
注入通道
子主题
binding
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者
作用
作为消息中间件与应用程序的提供者和消费者之间的桥梁
Binder解耦绑定
通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。甚至可以任意的改变中间件的类型而不需要修改一行代码 Spring Cloud对消息容器的抽象,不同的消息容器有不同的实现,通过它可以屏蔽各消息容器的内部细节。
实现
Kafka 实现了
RabbitMQ 实现了
RocketMQ 实现了
作用
负责提供与外部消息系统集成的组件
消费组
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.{channel-name}.group属性即可
partitions消息分区
目的:当生产者将消息数据发送给多个消费者实例时,保证同一消息数据始终是由同一个消费者实例接收和处理 场景:在实际的应用场景中,我们需要将同一种类型的消息,比如同一个用户,或者同一个类型的日志消息始终由同一个消费者消费并做统计,这个时候就可以使用消息分区了。 个人理解:分区可以理解为rocketmq中broker的队列
元素详解
注解
Integration原生实现消息收发
@InboundChannelAdapter
@ServiceActivator
@Transformer
@EnableBinding
作用
指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定
@StreamListener
作用 实现消息监听 发出消息时,会包含一个String类型的payload和含有contentType的Header信息。@StreamListener能够根据发送消息时的头信息contentType自动完成转换处理 用法 1、 /*********************************User对象处理实现******************************/ @StreamListener(MySink.USER_CHANNEL) public void userReceive(@Payload User user, @Headers Map headers) { LOGGER.info(headers.get("contentType").toString()); LOGGER.info("Received from {} channel username: {}", MySink.USER_CHANNEL, user.getUsername()); } @StreamListener(value = MySink.USER_CHANNEL) public void userReceive(@Payload User user, @Headers Map headers, @Header(name = "name") Object name) { LOGGER.info(headers.get("contentType").toString()); LOGGER.info("name : {}", name.toString()); LOGGER.info("Received from {} channel username: {}", MySink.USER_CHANNEL, user.getUsername()); }
作用
主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名
属性解析
String value(target)
String target(value)
概要
默认空字符串,等同主题
String condition
支持SpEL表达式 // 监听头信息中有flag=aa键值对的消息 @StreamListener(value = MySink.USER_CHANNEL, condition = "headers['flag']=='aa'") public void userReceiveByHeader1(@Payload User user) { } // 监听头信息中有flag=bb键值对的消息 @StreamListener(value = MySink.USER_CHANNEL, condition = "headers['flag']=='bb'") public void userReceiveByHeader2(@Payload User user) { }
默认空字符串,条件
String copyHeaders
默认字符串“true”
@SendTo
说明 我们还可能会遇到一个场景就是,我们接收到消息后,给别人一个反馈ACK,SpringCloud stream 给我们提供了一个SendTo注解可以帮我们干这些事情
@SendTo注解将会将返回值发送至指定通道
@Output
对应的是org.springframework.messaging.MessageChannel
生产者
@Input
对应的是org.springframework.messaging.SubscribableChannel
消费者
@Payload
@StreamListener(value = MySink.USER_CHANNEL) public void userReceive(@Payload User user, @Headers Map headers, @Header(name = "name") Object name) { LOGGER.info(headers.get("contentType").toString()); LOGGER.info("name : {}", name.toString()); LOGGER.info("Received from {} channel username: {}", MySink.USER_CHANNEL, user.getUsername()); }
用于接收的消息体
@Headers
@StreamListener(value = MySink.USER_CHANNEL) public void userReceive(@Payload User user, @Headers Map headers, @Header(name = "name") Object name) { LOGGER.info(headers.get("contentType").toString()); LOGGER.info("name : {}", name.toString()); LOGGER.info("Received from {} channel username: {}", MySink.USER_CHANNEL, user.getUsername()); }
获取消息头信息,其为一个Map键值对
@Header
@StreamListener(value = MySink.USER_CHANNEL) public void userReceive(@Payload User user, @Headers Map headers, @Header(name = "name") Object name) { LOGGER.info(headers.get("contentType").toString()); LOGGER.info("name : {}", name.toString()); LOGGER.info("Received from {} channel username: {}", MySink.USER_CHANNEL, user.getUsername()); }
获取指定头信息
特殊类/接口
Processor接口(消息中转站)
MessageChannel
SubscribableChannel
MessageHeaders
Message
message 没有set方法, 所以message 是不能改变的
getPayload()
getHeaders()
MessageConverter
自定义MessageConverter
MessageHandler
处理消息的约定
PartitionKeyExtractorStrategy
自定义分区策略
常用配置
spring.cloud.stream
instanceCount=1
消费实例数量总数
instanceIndex=1
当前实例的索引值
分区配置
当我们在采用集群的方式部署同一个应用时,每一个实例都可以接受到同一个应用有多少个实例数量,以及当前自己的实例在集群中的索引。Stream通过 spring.cloud.stream.instanceCount 实例数量和 spring.cloud.stream.instanceIndex 当前的实例索引实现这一点。如果实例总数instanceCount 是3,那么instanceIndex 索引从0开始到1、2 ,这两个属性的正确配置对于解决分区行为非常的重要,可以用来确保消息在多个实例之间正确的分割。
dynamicDestinations
可以动态绑定的目标列表(例如,在动态路由方案中)。如果设置,只能列出目的地。 默认值:空(允许任何目的地绑定)。
目标列表
bindings
<通道名>
destination
多个通道可以拥有一个主题 spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: t1-input: consumer: # 表示t1-input仅消费带有tag-1的消息 tags: tag-1 t2-input: consumer: # 表示t2-input可以消费带有tag-2或tag-3的消息(||用于分隔不同的tag) tags: tag-2||tag-3 bindings: t1-input: destination: stream-test-topic group: binder-group1 t2-input: destination: stream-test-topic group: binder-group2
设置主题
group
设置消费组
默认null
content-type
频道的内容类型
binder
当存在多个绑定器时,使用该参数来指定当前通道使用的哪个具体的绑定器
默认null
consumer
partitioned
消费者是否开启分区功能
默认 false
headerMode
设置为raw时,禁用输入头标题解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用
默认值:embeddedHeaders
maxAttempts
对输入通道消息处理的最大重试次数
默认3
backOffInitialInterval
重试消息处理的初始间隔时间
默认 1000
backOffMaxInterval
重试消息处理的最大间隔时间
默认 10000
backOffMultiplier
重试消息处理时间间隔 的递增乘数
默认 2.0
concurrency
输入通道消费者的并发数
默认1
producer
partitionCount
如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。 默认值:1
分区数量
partitionKeyExpress
一个确定如何分配出站数据的SpEL表达式。如果设置,或者如果设置了partitionKeyExtractorClass,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。请参阅分区支持。 默认值:null。值可以为 payload,payload.id等(个人猜测:是注入到spring的Bean名称,自定义的分区策略) 其中,payload表示获取消息后,进行hash取值计算出分区的值
分区键
partitionKeyExtractorClass
一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。请参阅分区支持。 默认值:null
partitionSelectorClass
一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。 默认值:null
partitionSelectorExpression
用于自定义分区选择的SpEL表达式。与partitionSelectorClass相互排斥。如果没有设置,则分区将被选择为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。 默认值:null
requiredGroups
生成者必须确保消息传递的组合的逗号分隔列表,即使它们在创建之后启动(例如,通过在RabbitMQ中预先创建持久队列)
headerMode
设置为raw时,禁用输出上的标题嵌入。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。在非Spring Cloud Stream应用程序生成数据时很有用。 默认值:embeddedHeaders
rocketmq
rocketmq特有配置
bindings.<通道名>
producer
group
默认empty
transactional
是否发送事务消息
默认 false
sync
是否使用同步得方式发送消息
默认false
enable
是否启用 Producer
默认true
maxMessageSize
消息发送的最大字节数
默认值: 8249344
vipChannelEnabled
是否在 Vip Channel 上发送消息
默认值: true
sendMessageTimeout
发送消息的超时时间(毫秒)
默认值: 3000
compressMessageBodyThreshold
消息体压缩阀值(当消息体超过 4k 的时候会被压缩)
默认值: 4096
retryTimesWhenSendFailed
在同步发送消息的模式下,消息发送失败的重试次数
默认值: 2
retryTimesWhenSendAsyncFailed
在异步发送消息的模式下,消息发送失败的重试次数
默认值: 2
retryNextServer
消息发送失败的情况下是否重试其它的 broker
默认值: false
consumer
sql
目前仅rocketmq支持 消息发送方setHead("amount",100) # 表示t1-input仅消费amount小于等于100的消息 sql: 'amount <= 100' 特别注意: 默认情况下,RocketMQ的SQL过滤支持是关闭的,所以我们需要通过添加一些配置项进行开启。首先进入RocketMQ的安装目录,然后编辑conf/broker.conf文件: [root@study-01 ~]# cd /usr/local/rocketmq-4.5.1/ [root@study-01 /usr/local/rocketmq-4.5.1]# vim conf/broker.conf 在文件末尾添加如下配置项: enablePropertyFilter = true
默认值: empty
tags
目前仅rocketmq支持 值的定义符合rocketmq的tag定义方式。如 tag-1 tag-1||tag-2
Consumer 基于 TAGS 订阅,多个 tag 以 || 分割
默认值: empty
enable
是否启用 Consumer
默认值: true
broadcasting
Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式
默认false
orderly
Consumer 是否同步消费消息模式
默认false
delayLevelWhenNextConsume
异步消费消息模式下消费失败重试策略:-1,不重复,直接放入死信队列0,broker 控制重试策略>0,client 控制重试策略
默认0
suspendCurrentQueueTimeMillis
同步消费消息模式下消费失败后再次消费的时间间隔
默认1000
binder
name-server
name-server的地址。如: 192.168.190.129:9876
access-key
secret-key
customized-trace-topic
enable-msg-trace
未知
group