SpringCloud Stream(消息驱动)

    科技2022-07-10  182

    概述:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型; 发布订阅模式:Topic主题进行广播@1rabbitMq是Exchange,Kafaka是Topic Binder:通过定义绑定器Binder作为中间件,实现了应用程序域消息中间件细节之间的隔离


    服务端: pom web,actuator,eureka-client,

    <dependency> <groupId>org.springframework.cloud<groupId> <artifactId>spring-cloud-starter-stream-rabbit<artifactId> <dependency>

    yml配置:

    server: port: 8801 spring: application: name:cloud-stream-provider cloud: stream: binders: #在此处配置绑定的rabbitmq的服务信息 defaultRabbit: #表示u定义的名称,用于binding的整合 type:rabbit #消息组件类型 environment #设置rabbitmq的相关的环境配置 spring: rabbitmq: host:localhost port:5672 username:guest password:guest bindings: # 服务的整合处理 output:#这个名字是一个通道的名称 用于发送 destination:studyExchange: #表示使用Exchange名称定义 content-type:application/json #设置消息类型,本次为json,文本则设置“text/plain” biinder:defaultRabbit #设置要绑定的消息服务的具体设置 eureka: client: service-url: defaultZone:http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds:2 #设置心跳时间 默认30秒 lease-expiration-duration-in-secondes:#间隔时间默认90秒 instance-id:send-8801.com #在消息列表时显示主机名称 prefer-ip-address:true #访问的路劲变为Ip地址

    java代码: 发送消息接口:

    public interface IMessageProvider { public String send(); }

    发送接口实现类:

    @EnableBinding(Source.class) #定义消息推送管道 public class MessageProviderImpl implement IMessageProvideer { @Resource private MessageChannel output;//消息发送通道 类似Dao @Override public String send() { String serial = UUID.randomUUID.toString; output.send(MessageBuilder.withPlayload).build(); return null; } }

    Controller层:

    @RestController public class SendMessageController { @Resource private IMessageProvider messageProvader; @GetMapping public String sendMessage() { return messageProvader.send(); } }

    消费者: pom: web,actuator,eureka-client,

    <dependency> <groupId>org.springframework.cloud<groupId> <artifactId>spring-cloud-starter-stream-rabbit<artifactId> <dependency>

    yml:

    server: port: 8802 spring: application: name:cloud-stream-consumer cloud: stream: binders: #在此处配置绑定的rabbitmq的服务信息 defaultRabbit: #表示u定义的名称,用于binding的整合 type:rabbit #消息组件类型 environment #设置rabbitmq的相关的环境配置 spring: rabbitmq: host:localhost port:5672 username:guest password:guest bindings: # 服务的整合处理 **input**:#这个名字是一个通道的名称 用于接收 destination:studyExchange: #表示使用Exchange名称定义 content-type:application/json #设置消息类型,本次为json,文本则设置“text/plain” biinder:defaultRabbit #设置要绑定的消息服务的具体设置 eureka: client: service-url: defaultZone:http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds:2 #设置心跳时间 默认30秒 lease-expiration-duration-in-secondes:#间隔时间默认90秒 instance-id:receive-8802.com #在消息列表时显示主机名称 prefer-ip-address:true #访问的路劲变为Ip地址

    java代码:

    @Component @EnableBinding(Sink.class) public class ReceiveController{ @StreamListener(Sink.INPUT) public void input(Message<String> message) { sout(message.getPlayload()) } }

    重复消费问题: 如何解决:分组和持久化属性group;在Stream中处于同一个group中的group的多个消费者时竞争关系,就能够保证消息只会被其中一个应用消费一次; 不同组时可以重复消费的(不配置系统会随机生成组别) 同一组内发送竞争关系,只有其中一个可以消费; 自定义分组: yml:

    **input**:#这个名字是一个通道的名称 用于接收 destination:studyExchange: #表示使用Exchange名称定义 content-type:application/json #设置消息类型,本次为json,文本则设置“text/plain” biinder:defaultRabbit #设置要绑定的消息服务的具体设置 group:aaa

    持久化问题: 配置分组解决

    Processed: 0.013, SQL: 8