15.SpringBoot整合RabbitMQ

    科技2022-08-17  111

    前置准备

    创建SpringBoot项目(版本2.2)准备RabbitmQ服务器(版本3.7.17)引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    整合生产者

    配置文件

    # RabbitMQ生产者 spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=root spring.rabbitmq.password=123456 spring.rabbitmq.host=192.168.1.108 spring.rabbitmq.port=5672 spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true spring.rabbitmq.template.retry.enabled=false spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0

    配置类1

    @Configuration public class RabbitConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("shanguoyu", false, false, null); } @Bean public Queue directQueue() { return new Queue("shanguoyu", false, false, false, null); } @Bean public Binding directBinding(Queue directQueue, DirectExchange directExchange) { Binding binding = BindingBuilder.bind(directQueue).to(directExchange).with("shanguoyu"); return binding; } /** * 生产者确认 * * @return */ @Bean public RabbitTemplate.ConfirmCallback confirmCallback(RabbitTemplate rabbitTemplate) { RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> { if (ack) { System.out.println("已发送到交换器!"); } else { //处理失败的消息 System.out.println("出现了异常!(考虑重发)"); System.out.println("原因是:" + cause); } }; rabbitTemplate.setConfirmCallback(confirmCallback); return confirmCallback; } /** * 失败通知 */ @Bean public RabbitTemplate.ReturnCallback returnCallback(RabbitTemplate rabbitTemplate) { RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey) -> { System.out.println("失败通知:消息不可路由!"); System.out.println("消息是:" + new String(message.getBody())); System.out.println("交换器是::" + exchange); System.out.println("路由键是:" + routingKey); System.out.println("状态码:" + replyCode); System.out.println("错误原因:" + replyText); }; rabbitTemplate.setReturnCallback(returnCallback); return returnCallback; } }

    配置类2

    @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.template.mandatory}") private boolean mandatory; //其他的@value没有写 @Bean public DirectExchange directExchange() { return new DirectExchange("shanguoyu", false, false, null); } @Bean public Queue directQueue() { return new Queue("shanguoyu", false, false, false, null); } @Bean public Binding directBinding(Queue directQueue, DirectExchange directExchange) { Binding binding = BindingBuilder.bind(directQueue).to(directExchange).with("shanguoyu"); return binding; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback, RabbitTemplate.ReturnCallback returnCallback){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //TODO 失败通知 rabbitTemplate.setMandatory(true); //TODO 发送方确认 rabbitTemplate.setConfirmCallback(confirmCallback); //TODO 失败回调 rabbitTemplate.setReturnCallback(returnCallback); return rabbitTemplate; } /** * 生产者确认 * * @return */ @Bean public RabbitTemplate.ConfirmCallback confirmCallback() { RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> { if (ack) { System.out.println("已发送到交换器!"); } else { //处理失败的消息 System.out.println("出现了异常!(考虑重发)"); System.out.println("原因是:" + cause); } }; return confirmCallback; } /** * 失败通知 */ @Bean public RabbitTemplate.ReturnCallback returnCallback() { RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey) -> { System.out.println("失败通知:消息不可路由!"); System.out.println("消息是:" + new String(message.getBody())); System.out.println("交换器是::" + exchange); System.out.println("路由键是:" + routingKey); System.out.println("状态码:" + replyCode); System.out.println("错误原因:" + replyText); }; return returnCallback; } }

    整合消费者

    配置文件

    # RabbitMQ消费者 spring.rabbitmq.host=192.168.1.108 spring.rabbitmq.port=5673 spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=root spring.rabbitmq.password=123456 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=0 spring.rabbitmq.listener.simple.concurrency=1 spring.rabbitmq.listener.simple.max-concurrency=5 spring.rabbitmq.listener.simple.default-requeue-rejected=false

    监听类

    @Component public class RabbitReceiver { @RabbitListener(queues = {"shanguoyu"}) public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); System.out.println(new String(message.getBody())); channel.basicAck(deliveryTag,false); } }
    Processed: 0.026, SQL: 9