基于Redis的分布式队列可以分为双端队列 ,阻塞队列(Blocking Queue),有界阻塞队列(Bounded Blocking Queue),阻塞双端队列(Blocking Deque),阻塞公平队列(Blocking Fair Queue),阻塞公平双端队列(Blocking Fair Deque) 不管是什么队列,其底层核心的执行逻辑仍旧是基于发布-订阅的主题来实现的
这里的消费消费并不是跟RabbitMQ的消费者一样,RabbitMQ是主动的将消息推送给消费者,但是Redisson需要在某个地方不断的监听是否有消息过来,从而决定是否需要执行相应的业务逻辑。
package com.learn.boot.config; import com.learn.boot.rdto.RMapDto; import org.redisson.api.RQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Component public class QueueRunner implements CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(QueueRunner.class); @Autowired private RedissonClient redissonClient; /** * 创建线程池去消费消息 */ public static ExecutorService queueThreadPool = Executors.newFixedThreadPool(1); @Override public void run(String... args) throws Exception { queueThreadPool.execute(() -> { String key = "myRedissonKey"; while (true) { RQueue<RMapDto> rQueue = redissonClient.getQueue(key); RMapDto rMapDto = rQueue.poll(); if (rMapDto != null) { log.info("队列消费消息监听到数据{}",rMapDto); } } }); } }之前我们用到的死信队列是RabbitMQ的,但是它有一个不是完美的点,不能为每个消息单独设置TTL,即使可以设置,但是依然保持着先进先出的的机制,也就是说设置了无效。 下面我们验证下RabbitMQ的这个问题
/* --------------------------------------------------------验证死信队列的不足之处------------------------------------------------ */ // 创建第一个中转站 //创建死信队列 @Bean(name = "testDealOrderQueue") public Queue testDealOrderQueue() { Map<String, Object> params = new HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", "testDeadOrderExchange"); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", "test-order-key"); return new Queue("testDealOrderQueue", true, false, false, params); } //创建“基本消息模型”的基本交换机,面向生产者 @Bean public TopicExchange testOrderExchange() { //创建并返回基本交换机实例 return new TopicExchange("testOrderExchange", true, false); } //创建“基本消息模型”的基本绑定(基本交换机+基本路由),面向生产者 @Bean public Binding testOrderBinding() { //创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定,不叫死信路由) return BindingBuilder.bind(testDealOrderQueue()).to(testOrderExchange()).with("order-normal-test-key"); } // 创建第二个中转站 // 创建真正队列,面向消费者 @Bean(name = "testOrderQueue") public Queue testOrderQueue() { //创建并返回面向消费者的真正队列实例 return new Queue("testOrderQueue", true); } // 创建死信交换机 @Bean public TopicExchange testDeadOrderExchange() { //创建并返回死信交换机实例 return new TopicExchange("testDeadOrderExchange", true, false); } // 创建死信路由及其绑定 @Bean public Binding testDeadOrderBinding() { //创建死信路由及其绑定实例 return BindingBuilder.bind(testOrderQueue()).to(testDeadOrderExchange()).with("test-order-key"); } /** * 监听真正队列——消费队列中的消息,面向消费者 * @param msg */ @RabbitListener(queues = "testOrderQueue",containerFactory = "singleListenerContainer") public void testOrderQueue(@Payload String msg){ try { log.info("死信队列超时订单-监听真正队列-消费队列中的消息,监听到消息内容 为:{}",msg); }catch (Exception e){ } } @RequestMapping("/testRabbitMQ") public ResultVo testRabbitMQ() throws JsonProcessingException { // 发送三个不同TTL的消息 Message message1 = MessageBuilder.withBody(objectMapper.writeValueAsBytes("5秒")) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setExpiration(String.valueOf(5000)) .build(); // 发送三个不同TTL的消息 Message message2 = MessageBuilder.withBody(objectMapper.writeValueAsBytes("10秒")) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setExpiration(String.valueOf(10000)) .build(); // 发送三个不同TTL的消息 Message message3 = MessageBuilder.withBody(objectMapper.writeValueAsBytes("15秒")) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setExpiration(String.valueOf(15000)) .build(); // 注意这里是发送到正常的队列 绑定到一个正常的路由 rabbitTemplate.convertAndSend("testOrderExchange","order-normal-test-key", message3); rabbitTemplate.convertAndSend("testOrderExchange","order-normal-test-key", message2); rabbitTemplate.convertAndSend("testOrderExchange","order-normal-test-key", message1); return ResultVo.success(); }Redisson 同样也提供了“延迟队列”这个强大的功能,但是解决了RabbitMQ的缺陷,不管什么时候都将遵循TTL从小到大的顺序先后被真正的队列监听、消费。 借助阻塞队列作为中转站,用于充当消息的第一个缓冲区,当TTL一到,消息将进入真正的队列被消费 下面我们采用实际的代码实战实现Redisson的DelayQueue的延迟发送,接收消息
@RequestMapping("/testDeadRedisson") public ResultVo testDeadRedisson() { DeadInfo deadInfo1 = new DeadInfo(1,"1"); DeadInfo deadInfo2 = new DeadInfo(2,"2"); DeadInfo deadInfo3 = new DeadInfo(3,"3"); DeadInfo deadInfo4 = new DeadInfo(4,"4"); String key = "myRedissonDeadQueue"; // 这里用到了阻塞队列 RBlockingDeque<DeadInfo> rBlockingDeque = redissonClient.getBlockingDeque(key); // 用阻塞队列创建一个延迟队列 RDelayedQueue<DeadInfo> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingDeque); // 往延迟队列里边添加消息 rDelayedQueue.offer(deadInfo1,5, TimeUnit.SECONDS); rDelayedQueue.offer(deadInfo2,10, TimeUnit.SECONDS); rDelayedQueue.offer(deadInfo3,15, TimeUnit.SECONDS); rDelayedQueue.offer(deadInfo1,20, TimeUnit.SECONDS); return ResultVo.success(); } package com.learn.boot.scheduled; import com.learn.boot.model.DeadInfo; import org.redisson.api.RBlockingDeque; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * Redisson延迟队列的消息模型,消费者,这里采用定时器 轮训的方式 */ @Component @EnableScheduling public class RedissonDelayQueueConsumer { /** * 定义日志 */ private static final Logger log = LoggerFactory.getLogger(RedissonDelayQueueConsumer.class); @Autowired private RedissonClient redissonClient; @Scheduled(cron = "*/1 * * * * ?") public void consumeMsg() throws InterruptedException { String key = "myRedissonDeadQueue"; RBlockingDeque<DeadInfo> rBlockingDeque = redissonClient.getBlockingDeque(key); // 从队列中取出消息 DeadInfo deadInfo = rBlockingDeque.take(); if (deadInfo != null) { log.info("Redisson延迟队列消息模型-消费者-监听消费真正队列中的消息: {} ",deadInfo); //TODO:在这里执行相应的业务逻辑 } } }