备用交换机
备用交换机介绍
如果主交换机无法路由消息,那么消息将被路由到备用交换机上如果设置了失败者通知又有备用交换机那么消息会发送到备用交换机上失败者通知不会生效如果备用交换机也无法匹配那么那条消息会有失败通知(如果失败通知开启了)
创建步骤与代码
创建备用交换机创建主交换机并将备用交换机绑定到主交换机上
channel
.exchangeDeclare("shanguoyu2", BuiltinExchangeType
.DIRECT
,false,false,false,null
);
Map
<String, Object> argsMap
= new HashMap<>();
argsMap
.put("alternate-exchange","shanguoyu2");
channel
.exchangeDeclare("shanguoyu3", BuiltinExchangeType
.DIRECT
,false,false,false,argsMap
);
死信交换机(死信队列)
死信交换机介绍
用来接收死掉消息的交换机死信交换器是要对队列设置的当指定队列出现了消息死亡的情况那么消息会被转发到死信交换器中
消息出现死亡的情况
消费者拒绝消费并且不允许重新投递消息过期队列达到最大长度(先进先出,最先进去的消息成为死掉的消息)
创建步骤与代码
创建一个交换机在创建队列的时候指定那个交换机为死信交换机
channel
.exchangeDeclare("shanguoyu4", BuiltinExchangeType
.DIRECT
,false,false,false,null
);
Map
<String, Object> argsMap
= new HashMap<>();
argsMap
.put("x-dead-letter-exchange","shanguoyu4");
channel
.queueDeclare("shanguoyu2",false,false,false,argsMap
);
延时队列
延时队列介绍
延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。有两种实现方式:TTL+DLX或rabbitmq-delayed-message-exchange 插件可用于订单超时等场景
TTL+DLX实现
流程图
代码
public class QueueTest3 {
public static void main(String
[] args
) throws IOException
, TimeoutException
{
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory
.setHost("192.168.1.108");
connectionFactory
.setPort(5672);
connectionFactory
.setVirtualHost("/");
connectionFactory
.setUsername("root");
connectionFactory
.setPassword("123456");
Connection connection
= connectionFactory
.newConnection();
Channel channel
= connection
.createChannel();
channel
.exchangeDeclare("DEAD_LETTER_EXCHANGE", BuiltinExchangeType
.DIRECT
,false,false,false,null
);
channel
.queueDeclare("DEAD_LETTER_QUEUE", false, false, false, null
);
channel
.queueBind("DEAD_LETTER_QUEUE", "DEAD_LETTER_EXCHANGE", "DL");
channel
.exchangeDeclare("shanguoyu", BuiltinExchangeType
.DIRECT
,false,false,false,null
);
Map
<String,Object> map
=new HashMap<>();
map
.put("x-message-ttl", TimeUnit
.SECONDS
.toMillis(20));
map
.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE");
map
.put("x-dead-letter-routing-key","DL");
channel
.queueDeclare("shanguoyu", false, false, false, map
);
channel
.queueBind("shanguoyu", "shanguoyu", "shanguoyu");
}
}
class Product{
public static void main(String
[] args
) throws IOException
, TimeoutException
{
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory
.setHost("192.168.1.108");
connectionFactory
.setPort(5672);
connectionFactory
.setVirtualHost("/");
connectionFactory
.setUsername("root");
connectionFactory
.setPassword("123456");
Connection connection
= connectionFactory
.newConnection();
Channel channel
= connection
.createChannel();
channel
.basicPublish("shanguoyu", "shanguoyu", null
, "Hello".getBytes());
channel
.close();
connection
.close();
}
}
缺点
如果是队列级别的消息过期时间, 一旦过期时间要有很多,比如30秒,50秒,40秒后去执行,那么需要创建很多交换机和队列来路由消息如果单独设置消息的 TTL,则可能会造成队列中的消息阻塞,比如在队列中有2条消息,第一条过期时间为10秒,第二条消息过期时间为5秒,按理来说应该是第2条消息出队,但是由于第一条消息 还未出队,所以无法投递
插件完成延时队列
插件介绍
在 RabbitMQ 3.5.7 及 以 后 的 版 本 提 供 了一 个 插 件 (rabbitmq-delayed-message-exchange)来实现延时队列功能。同时插件依赖 Erlang/OPT 18.0 及以上插件下载地址:https://bintray.com/rabbitmq/community-plugins/download_file?file_path=3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
安装插件
进入RabbitMQ的插件目录下载插件——下载插件——wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip使用unzip解压启动插件——rabbitmq-plugins enable rabbitmq_delayed_message_exchange停止插件命令是——rabbitmq-plugins disable rabbitmq_delayed_message_exchange
代码
public class QueueTest4 {
public static void main(String
[] args
) throws IOException
, TimeoutException
{
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory
.setHost("192.168.1.108");
connectionFactory
.setPort(5672);
connectionFactory
.setVirtualHost("/");
connectionFactory
.setUsername("root");
connectionFactory
.setPassword("123456");
Connection connection
= connectionFactory
.newConnection();
Channel channel
= connection
.createChannel();
Map
<String, Object> argss
= new HashMap<String, Object>();
argss
.put("x-delayed-type", "direct");
channel
.exchangeDeclare("dea.letter.exchange", "x-delayed-message",false,false,false,argss
);
channel
.queueDeclare("dead.letter.queue", false, false, false, null
);
channel
.queueBind("dead.letter.queue", "dea.letter.exchange", "DL");
channel
.close();
connection
.close();
}
}
class Product2{
public static void main(String
[] args
) throws IOException
, TimeoutException
, InterruptedException
{
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory
.setHost("192.168.1.108");
connectionFactory
.setPort(5672);
connectionFactory
.setVirtualHost("/");
connectionFactory
.setUsername("root");
connectionFactory
.setPassword("123456");
Connection connection
= connectionFactory
.newConnection();
Channel channel
= connection
.createChannel();
AMQP
.BasicProperties properties
= new AMQP.BasicProperties();
Map
<String, Object> stringObjectHashMap
= new HashMap<>();
stringObjectHashMap
.put("x-delay", 20000);
AMQP
.BasicProperties build
= properties
.builder().headers(stringObjectHashMap
).build();
channel
.basicPublish("dea.letter.exchange","DL",build
, "Hello".getBytes());
Thread
.sleep(20000);
System
.out
.println("到达了!");
channel
.close();
connection
.close();
}
}