之前使用rabbitMQ时只是简单的应用,停留在demo的概念上,本片博客是结合实际使用时会遇到的问题,在之前那篇文章springboot整合RabbitMQ简单使用基础上,继续进行讨论,有兴趣的可以看看之前那篇文章,里面有一些基本的概念介绍。
本篇文章将尝试回答这么几个问题:
SpringBoot中创建RabbitMQ的队列、交换机、及绑定的几种方式?RabbitMQ中的ack机制是什么、如何使用?RabbitMQ中confirm-callback与return-callback的作用、如何使用?配合redis ,如何保证消息不丢失?RabbitMQ中重试机制,重试后转到其他队列SpringBoot中RabbitMQ的自动配置原理,如何自己扩展配置?其他问题: a.当生产者发送消息时的队列并不存在时,springboot是否会自动创建?
b.当创建的队列或者交换机已经存在时,会如何处理?
方法一:直接进入RabbitMQ后台可视化处理
如下图所示: 一般这种需要运维配合,缺点:因为这都是线上如果操作不当删除其他业务的队列,后果很严重。所以运维一般不愿意采用这种方式,除非是已经发版后,需要紧急修改,不过自己写demo的时候,直接到可视化管理会比较方便。(上篇博客中有稍微具体一点的介绍)。
方法二:容器启动时创建
第一种采用AmqpAdmin管理员类进行操作 (这个类其实就是上面可视化功能抽象出来的类、相关的操作都是这个类所具有都行为,所以用他能够完成一系列配置。) /** * @author liuzihao * @create 2020-10-09-11:07 */ @Configuration public class RabbitMqAdminConfig { public Queue testQueue() { return new Queue("test-admin1"); } public DirectExchange myExchange() { return new DirectExchange("directexchange-admin1"); } public Binding binding() { return BindingBuilder.bind(testQueue()).to(myExchange()).with("test-key-admin1"); } @Bean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { /** * 如果不自己创建amqpAdmin,自动配置类中会创建 */ AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory); amqpAdmin.declareQueue(testQueue()); /* * Exchange是一个接口实现类如下: * 1.DirectExchange * 2.FanoutExchange * 3.TopicExchange * */ amqpAdmin.declareExchange(myExchange()); /** * Binding可以设置的参数: * destination 目的地 * destinationType 绑定的类型 是跟队列绑定 还是交换机绑定 * exchange 交换机的name * routingKey 路由的key * arguments; 参数 可以不设置 */ amqpAdmin.declareBinding(binding()); return amqpAdmin; } }上述这种方法在声明Queue和Exchange时网上很多的方法都会在上面使用@Bean注解放到容器中、其实使用AmqpAdmin这个类时,可以不用放到容器中,直接用admin对象配置基本组件,当然注入也不会出错,接下来介绍的这种方法就是直接放到容器中即可
第二种是直接配置相关对象,注入到容器中 (简单粗暴) 容器在启动过程中会自己帮我我们创建 /** * @author liuzihao * @create 2020-10-08-22:13 * */ @Configuration public class RabbitMqConfig { @Bean public Queue testQueue() { return new Queue("test3"); } @Bean public DirectExchange myExchange() { return new DirectExchange("directexchange3"); } @Bean public Binding binding() { return BindingBuilder.bind(testQueue()).to(myExchange()).with("test-key3"); } }方法三:消费者使用注解@RabbitListener进行创建
因为多播模式下,当某个消费者需要生产者的消息时,按照业务逻辑必然要自己创建队列,并且绑定交换机,这样当有生产者产生消息时,消息就会路由到自己所创建多队列中。(多播模式下由消费者自己维护队列)
@Component public class QueueListener{ //进行监听 @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "lister-test", durable = "true"), exchange = @Exchange(value = "lister-exchange", type = ExchangeTypes.DIRECT, durable = "true"), key = "key-listener" )} ) public void receive(Map map){ System.out.println("监听接收到消息"+map); } }ack 机制是一种确认机制,是消费者从队列中消费消息后的一种应答机制。
默认:采用的方式是自动ack,也就是说,消费者从队列中拿到消息进行业务处理后,队列中的消息就被删除了,如果此刻消费者处理该消息的时候发生异常,导致业务没有正常处理,那么这个该处理的消息也就丢失了。手动ack:当正确消费消息后,调用channel.basicAck(),进行手动确认;如果在处理消息过程中发送异常,可以调用channel.basicNack,表示拒绝消息,并且通过参数设置,可以让没有消费的消息再次回到队列,下次消费(但是不断的重入队列,虽然消息不会丢失、却会造成消息队列阻塞、可以采用重试机制,或者转移到另外的队列如:死信队列中)。 这样的话,控制了消息处理过程中的正确性。消息的确认,其实对于生产者、与消费者可以理解都有他们的ack确认机制
生产者: 保证消息到达交换机 、消息再从交换机到队列消费者:消息正确从队列中被消费那么RabbitMQ都提供了哪些机制来解决这些问题呢?
生产者confirm-callback回调是检验消息是否达到交换机的一个机制。(具体config配置见最后)
// 如果消息没有到exchange, 则confirm回调, ack=false; 如果消息到达exchange, 则confirm回调, ack=true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息确认到达exchange!"); RedisUtils.del(correlationData.getId()); } else { log.error("消息推送MQ Exchange失败, 请检查MQ是否异常, 消息ID: {}", correlationData.getId()); } });效果:
Return-Callback是校验消息是否到达队列的一个机制。(具体config配置见最后)
// 如果exchange到queue成功, 则不回调return; 如果exchange到queue失败, 则回调return(需设置mandatory=true, 否则不会回调, 消息就丢了) rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error("消息推送MQ Queue失败, 请检查路由键准确, 交换机:{}, 路由键:{}", exchange, routingKey); // 这里其实可以采用重试机制 // 1、休眠3秒 // 2、重试 RabbitMQUtils.sendMessageToMQ(exchange, routingKey, messageObj); }); 消费者生产者保证消息无误的到达队列后,那么它的使命已经完成、消费者要做的就是如果正确的消费它。在上文中在介绍ack的时候已经提到,如果自动ack可能会导致消息丢失。那么可以采用这几种方案
拒绝消息,让其重回队列,等待下一个消费者消费(可能会阻塞队列)抛出异常 启用重试机制、到达重试阈值后,消息转到其他队列(利用RepublishMessageRecoverer类)(不能开启重入,会阻塞)拒绝无法处理的消息转移到死信队列中特殊处理(需要配置死信队列)拒绝消息、不重入队列,记录日志后续处理(与重试机制类似,少了重试)例子:
@Component public class QueueListener{ @RabbitListener(queues = {"my.amqpAdmin.queue"}) /** * @Payload Employee employee 表示消息的载体 (也可不用注解标注) * @Header(AmqpHeaders.CHANNEL) 取Channel信息 (也可不用注解标注) * @Header(AmqpHeaders.DELIVERY_TAG) Long tag 获取消息的标志id * (tag 也可以在方法产生通过注入 Message 然后通过message.getMessageProperties().getDeliveryTag()进行获取) */ public void getMessage(@Payload Employee employee, @Header(AmqpHeaders.CHANNEL) Channel channel1, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws Exception{ /** * 假设当消息Employee age为5的时候拒绝消息 */ if (employee.getAge() == 5) { /** * @param long deliveryTag 消息的标志 * @param boolean multiple 是否批量处理 * @param boolean requeue 是否重入队列 */ System.out.println("消费者,拒绝消息" + employee); channel1.basicNack(tag, false, false); /** * 要想重试机制生效,进行相关配置后、还需要抛出异常,并且requeue = fasle(不然会一直阻塞) * throw new RuntimeException("111"); */ }else { /** * @param long deliveryTag 消息的标志 * @param boolean multiple 是否批量处理 */ channel1.basicAck(tag, false); System.out.println("消费者,成功处理应答==>"+employee); } } }画图不容易啊
虽然生产者一方存在,消息到达回调机制,但是仍有可能消息还没到达mq时,消息就丢失了,所以我们可以采用
redis缓存 + 定时任务补偿的方案 如下图:(画图不易,可以点个赞)一、消息如果没有到达exchange、定时任务会根据redis里的缓存进行补偿 二、消息如果没有到队列,会有重发机制
采用redis缓存,再加上回调重试机制,可以说保证了消息不丢失送达队列,
上述方案,会有消息重发的风险,所以需要消费者方根据业务键,做好幂等处理
- mq的重试是针对消费者的,并不是说消息重新进入队列,然后被消费。二者没有因果关系、没有因果关系、没有因果关系. 是消费者抛出异常后的一种重试机制,想要触发异常需要把异常抛出来
配置(配置类代码见最后): # 开启重试 spring.rabbitmq.listener.simple.retry.enabled=true # 重试次数,默认为3次 spring.rabbitmq.listener.simple.retry.max-attempts=5 # 也可以设置重试时间间隔但是发送重试后,消息还是没有处理成功的话,就会被丢弃,继续处理其他消息。 查看一下消息队列里的消息: 已经被全部check了,那条拒绝的消息也就丢失了。为了解决这个问题,拒绝的消息是不是可以转移到另外的一个队列中呢?大家想到的可能就是死信队列了,死信队列是一种方式,其实我也可以让他转移到我买自定义的业务队列,比如:自定义一个回收队列 rec-queue,然后再针对该队列中无法处理的消息做特殊处理。
方式1:死信队列(如果配置了死信队列,并且nack :requeue = false,会自动发送到死信)方式2:自定义回收器、转发到自定义到业务队列中。下面介绍方式2建立一个回收队列和回收交换机
/** * 创建回收队列 * @return */ @Bean public DirectExchange errorExchange(){ return new DirectExchange("rec-exchange",true,false); } @Bean public Queue errorQueue(){ return new Queue("rec-queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){ return BindingBuilder.bind(errorQueue).to(errorExchange).with("routing-key"); }建立完回收队列,那么如果让重试后到消息回收到改队列呢?那么下面了解一个接口:MessageRecoverer,名字就是消息回收器,主要是当所有尝试都失败时回回调。
public interface MessageRecoverer { // 回调被消耗但所有重试尝试都失败的消息。 void recover(Message message, Throwable cause); }他有两个实现类:RepublishMessageRecoverer 和RejectAndDontRequeueRecoverer(默认使用该回收器),只要创建RepublishMessageRecoverer类到容器中即可。
/** * 消息回收器 * @param rabbitTemplate * @return */ @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate,"rec-exchange","routing-key"); }这样在重试次数,超过阈值时,将会转移到自定义的 回收队列当中如下图: 再看一下我们定义的回收队列中也就有了重试之后无法处理的消息,我们可以特殊处理。
其他问题: a.当生产者发送消息时的队列并不存在时,springboot是否会自动创建? 答:不会,会抛出异常
b.当创建的队列或者交换机已经存在时,会如何处理? 答:保持原来的队列,或交换机,不会覆盖。(具体源码还没找到,如果有了解的,感谢留言告知)
(因为我生产者和消费者用了两个服务,所以会有两个配置类)
生产者配置类 /** * @author liuzihao * @create 2020-10-08-22:13 * */ @Configuration @Slf4j public class RabbitMqConfig { @Bean public Queue testQueue() { return new Queue("my.amqpAdmin.queue"); } @Bean public DirectExchange myExchange() { return new DirectExchange("my.direct"); } @Bean public Binding binding() { return BindingBuilder.bind(testQueue()).to(myExchange()).with("my.key"); } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * MQ发送方配置 * * @param connectionFactory * @return */ @Bean @Primary public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { // 若使用confirm-callback, 必须设置publisherConfirms为true connectionFactory.setPublisherConfirms(true); // 若使用return-callback, 必须设置publisherReturns为true connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 使用return-callback, 必须设置mandatory为true rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(messageConverter()); // 如果消息没有到exchange, 则confirm回调, ack=false; 如果消息到达exchange, 则confirm回调, ack=true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息确认到达exchange!"); /** * * 删除缓存在redis中的消息(保证消息不丢失,可以在发送消息的时候缓存在redis中) * RedisUtils.del(correlationData.getId()); */ } else { log.error("消息推送MQ Exchange失败, 请检查MQ是否异常, 消息ID: {}", correlationData.getId()); } }); // 如果exchange到queue成功, 则不回调return; 如果exchange到queue失败, 则回调return(需设置mandatory=true, 否则不会回调, 消息就丢了) rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error("消息推送MQ Queue失败, 请检查路由键准确, 交换机:{}, 路由键:{}", exchange, routingKey); /** * 重试机制 * 1、消息反序列化 * Object messageObj = new Jackson2JsonMessageConverter().fromMessage(message); * 2、休眠 * 3、重新发送到mq * RabbitMQUtils.sendMessageToMQ(exchange, routingKey, messageObj); */ }); return rabbitTemplate; } } 消费者配置类 @Configuration public class MyConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); // 手动应答 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 重试需要设置为false factory.setDefaultRequeueRejected(false); return factory; } /** * 创建回收队列 * @return */ @Bean public DirectExchange errorExchange(){ return new DirectExchange("rec-exchange",true,false); } @Bean public Queue errorQueue(){ return new Queue("rec-queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){ return BindingBuilder.bind(errorQueue).to(errorExchange).with("routing-key"); } /** * 消息回收器 * @param rabbitTemplate * @return */ @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate,"rec-exchange","routing-key"); } } github:代码地址:注意是在master分支,默认是main分支