09.部分扩展参数详解和延时队列

    科技2022-07-11  113

    备用交换机

    备用交换机介绍

    如果主交换机无法路由消息,那么消息将被路由到备用交换机上如果设置了失败者通知又有备用交换机那么消息会发送到备用交换机上失败者通知不会生效如果备用交换机也无法匹配那么那条消息会有失败通知(如果失败通知开启了)

    创建步骤与代码

    创建备用交换机创建主交换机并将备用交换机绑定到主交换机上 //我这里虽然创建了备用交换机但是并没有队列与他绑定!,为了好理解并没有掺杂其他的代码 //创建备用交换机 channel.exchangeDeclare("shanguoyu2", BuiltinExchangeType.DIRECT,false,false,false,null); //创建主交换机并用备用交换机绑定到主交换机上 Map<String, Object> argsMap = new HashMap<>(); argsMap.put("alternate-exchange","shanguoyu2"); channel.exchangeDeclare("shanguoyu3", BuiltinExchangeType.DIRECT,false,false,false,argsMap);

    死信交换机(死信队列)

    死信交换机介绍

    用来接收死掉消息的交换机死信交换器是要对队列设置的当指定队列出现了消息死亡的情况那么消息会被转发到死信交换器中

    消息出现死亡的情况

    消费者拒绝消费并且不允许重新投递消息过期队列达到最大长度(先进先出,最先进去的消息成为死掉的消息)

    创建步骤与代码

    创建一个交换机在创建队列的时候指定那个交换机为死信交换机 //创建死信交换机(注意:没有为死信交换机指定绑定) channel.exchangeDeclare("shanguoyu4", BuiltinExchangeType.DIRECT,false,false,false,null); //创建队列是的时候绑定私信交换机 Map<String, Object> argsMap = new HashMap<>(); argsMap.put("x-dead-letter-exchange","shanguoyu4"); channel.queueDeclare("shanguoyu2",false,false,false,argsMap);

    延时队列

    延时队列介绍

    延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。有两种实现方式:TTL+DLX或rabbitmq-delayed-message-exchange 插件可用于订单超时等场景

    TTL+DLX实现

    流程图

    代码

    /** * 演示TTL+DLX完成延时队列 */ public class QueueTest3 { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接IP connectionFactory.setHost("192.168.1.108"); //连接端口 connectionFactory.setPort(5672); //虚拟主机(默认为/) connectionFactory.setVirtualHost("/"); //账号 默认是guest connectionFactory.setUsername("root"); //密码 默认是guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建死信交换器 与队列等 channel.exchangeDeclare("DEAD_LETTER_EXCHANGE", BuiltinExchangeType.DIRECT,false,false,false,null); channel.queueDeclare("DEAD_LETTER_QUEUE", false, false, false, null); channel.queueBind("DEAD_LETTER_QUEUE", "DEAD_LETTER_EXCHANGE", "DL"); //创建交换器队列等并绑定死信交换器并指定队列中的消息的过期时间为5秒 channel.exchangeDeclare("shanguoyu", BuiltinExchangeType.DIRECT,false,false,false,null); Map<String,Object> map=new HashMap<>(); map.put("x-message-ttl", TimeUnit.SECONDS.toMillis(20)); map.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE"); map.put("x-dead-letter-routing-key","DL"); channel.queueDeclare("shanguoyu", false, false, false, map); channel.queueBind("shanguoyu", "shanguoyu", "shanguoyu"); } } /** * 生产者生产消息 */ class Product{ public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接IP connectionFactory.setHost("192.168.1.108"); //连接端口 connectionFactory.setPort(5672); //虚拟主机(默认为/) connectionFactory.setVirtualHost("/"); //账号 默认是guest connectionFactory.setUsername("root"); //密码 默认是guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); channel.basicPublish("shanguoyu", "shanguoyu", null, "Hello".getBytes()); channel.close(); connection.close(); } }

    缺点

    如果是队列级别的消息过期时间, 一旦过期时间要有很多,比如30秒,50秒,40秒后去执行,那么需要创建很多交换机和队列来路由消息如果单独设置消息的 TTL,则可能会造成队列中的消息阻塞,比如在队列中有2条消息,第一条过期时间为10秒,第二条消息过期时间为5秒,按理来说应该是第2条消息出队,但是由于第一条消息 还未出队,所以无法投递

    插件完成延时队列

    插件介绍

    在 RabbitMQ 3.5.7 及 以 后 的 版 本 提 供 了一 个 插 件 (rabbitmq-delayed-message-exchange)来实现延时队列功能。同时插件依赖 Erlang/OPT 18.0 及以上插件下载地址:https://bintray.com/rabbitmq/community-plugins/download_file?file_path=3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

    安装插件

    进入RabbitMQ的插件目录下载插件——下载插件——wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip使用unzip解压启动插件——rabbitmq-plugins enable rabbitmq_delayed_message_exchange停止插件命令是——rabbitmq-plugins disable rabbitmq_delayed_message_exchange

    代码

    /** * 演示插件完成延时队列 */ public class QueueTest4 { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接IP connectionFactory.setHost("192.168.1.108"); //连接端口 connectionFactory.setPort(5672); //虚拟主机(默认为/) connectionFactory.setVirtualHost("/"); //账号 默认是guest connectionFactory.setUsername("root"); //密码 默认是guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); Map<String, Object> argss = new HashMap<String, Object>(); argss.put("x-delayed-type", "direct"); channel.exchangeDeclare("dea.letter.exchange", "x-delayed-message",false,false,false,argss); channel.queueDeclare("dead.letter.queue", false, false, false, null); channel.queueBind("dead.letter.queue", "dea.letter.exchange", "DL"); channel.close(); connection.close(); } } class Product2{ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接IP connectionFactory.setHost("192.168.1.108"); //连接端口 connectionFactory.setPort(5672); //虚拟主机(默认为/) connectionFactory.setVirtualHost("/"); //账号 默认是guest connectionFactory.setUsername("root"); //密码 默认是guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); Map<String, Object> stringObjectHashMap = new HashMap<>(); stringObjectHashMap.put("x-delay", 20000); AMQP.BasicProperties build = properties.builder().headers(stringObjectHashMap).build(); channel.basicPublish("dea.letter.exchange","DL",build , "Hello".getBytes()); Thread.sleep(20000); System.out.println("到达了!"); channel.close(); connection.close(); } }
    Processed: 0.009, SQL: 8