专栏名称: 木子教程
十年以上IT工作经验,有架构师、大数据分析、...
目录
相关文章推荐
今天看啥  ›  专栏  ›  木子教程

50 Spring Cloud Stream 自定义消息通道

木子教程  · 简书  ·  · 2022-01-16 19:19

Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出 流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。

interface OrderProcessor {
    String INPUT_ORDER = "inputOrder";
    String OUTPUT_ORDER = "outputOrder";
    @Input(INPUT_ORDER)
    SubscribableChannel inputOrder();
    @Output(OUTPUT_ORDER)
    MessageChannel outputOrder();
}
  • 一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个 订单输入,和订单输出两个 binding。
  • 使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
  • 使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER
spring:
 cloud:
   stream:
     defaultBinder: defaultRabbit
     bindings:
       inputOrder:
         destination: mqTestOrder
       outputOrder:
         destination: mqTestOrder

如上配置,指定了 destination 为 mqTestOrder 的输入输出流。

消息分组

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例 的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一 条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之 下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来 实现这样的功能。

image-20211231222758423.png

实现的方式非常简单,我们只需要在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性即可,比如我们可以这样实现:

server:
 port: 7003 #服务端口
spring:
 application:
   name: rabbitmq-consumer #指定服务名
 rabbitmq:
   addresses: 127.0.0.1
   username: itcast
   password: itcast
   virtual-host: myhost
 cloud:
   stream:
     bindings:
       input:
         destination: itcast-default
       inputOrder:
         destination: testChannel
         group: group-2
     binders:
       defaultRabbit:
         type: rabbit

在同一个group中的多个消费者只有一个可以获取到消息并消费

消息分区

有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一 个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次 请求取消task,此场景就必须保证两次请求至同一实例.

image-20211231222914420.png

消息消费者配置

 cloud:
   stream:
     instance-count: 2
     instance-index: 0
     bindings:
       input:
         destination: muziwk-default
       inputOrder:
         destination: testChannel
         group: group-2
         consumer:
           partitioned: true
     binders:
       defaultRabbit:
         type: rabbit

从上面的配置中,我们可以看到增加了这三个参数:

  1. spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分 区功能;
  2. spring.cloud.stream.instanceCount :该参数指定了当前消费者的总实例数量;
  3. spring.cloud.stream.instanceIndex :该参数设置当前实例的索引号,从0开始,最大值为 spring.cloud.stream.instanceCount 参数 - 1。我们试验的时候需要启动多个实例,可以通过 运行参数来为不同实例设置不同的索引值

消息生产者配置

spring:
 application:
   name: rabbitmq-producer #指定服务名
 rabbitmq:
   addresses: 127.0.0.1
   username: itcast
   password: itcast
   virtual-host: myhost
 cloud:
   stream:
     bindings:
       input:
         destination: muziwk-default
spring:
 application:
   name: rabbitmq-producer #指定服务名
 rabbitmq:
   addresses: 127.0.0.1
   username: muziwk
   password: muziwk
   virtual-host: myhost
 cloud:
   stream:
     bindings:
       input:
         destination: muziwk-default


  1. pring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数 指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区 键;
  2. spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分 区的数量。

到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,但需要注意的是 要为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实 例在接收和处理这些相同的消息。




原文地址:访问原文地址
快照地址: 访问文章快照