RabbitMQ有六种不同的工作模式
参考: https://www.rabbitmq.com/getstarted.html
www.rabbitmq---->Get Started------------>RabbitMQ Tutorials
给生产者和消费者配置maven坐标依赖
<dependencies> <!--RabbitMQ java客户端依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies> <build> <plugins> <!--编译插件--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins>运行,然后访问 http://192.168.147.133:15672/#/queues 如图
运行,看控制台的打印信息
对于一些任务繁重,或任务较多的情况下使用工作模式可以提高任务处理的速度
先运行两消费者,消费者一直监听这RabbitMQ
然后启动生产者,看看消费者的控制台发现它们分担了处理
交换机需要和队列进行绑定,绑定之后;一个消息可以被多个消费者收到
启动所有的消费者
然后使用生产者发送消息,在每个消费者对应的控制台可以查看到生产者发送的所有消息
Routing路由模式要求队列在绑定交换机是要指定routing key,消息会转发的符合的routing key的队列
启动所有消费者,然后使用生产者发送消息
查看控制台的打印信息
Topic通配符模式可以实现PusSub发布与订阅模式和Routing路由模式
Topic通配符模式在路由名称上使用通配符,显得更加灵活
使用通配符:
*:表示一个词
例如:cdw.*
cdw.test1 可以匹配到
cdw.test1.test2 这不止一个词了,所以匹配不到
#:表示至少一个词和多个词
例如:cdw.#
dw.test1 可以匹配到
cdw.test1.test2 这不止一个词,所以也可以匹配到
cdw.# : 表示以cdw开头的都能匹配到
#.cdw:表示以cdw结尾的都能匹配到
一个生产者,一个消费者,不需要设置交换机(使用默认交换机)
一个生产者,多个消费者(竞争关系),不需要设置 交换机(使用默认交换机)
需要设置类型为FANOUT的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机将消息发送到绑定的队列
需要设置类型为DIRECT的交换机,交换机和队列进行绑定,并且知道routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
需要设置类型为TOPIC的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机很根据routing key将消息发送到对应的队列
和配置文件的要对上
然后重新onMessage方法,然后直接打印就可以
package cn.cdw.rabbitmq.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class TopicListenerWell2 implements MessageListener { @Override public void onMessage(Message message) { System.out.println(new String(message.getBody())); } }然后写有一个测试方法,让它一直不停
package cn.cdw.rabbitmq.listener; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class QueueTest { @Test public void queueTest() { boolean flag=true; while (true) { } } }新建一个maven工程
编写yaml配置,基本信息配置
#配置RabbitMQ的基本信息 ip,端口,虚拟机,用户名,密码 spring: rabbitmq: host: 192.168.147.133 port: 5672 virtual-host: /cdw username: cdw password: cdw注入RabbitmqTemplate,调用方法,完成消息发送
package cn.cdw.producer; import cn.cdw.producer.config.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest @RunWith(SpringRunner.class) public class RabbitMQConfigTest { //注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; //编写一个测试方法发送消息 @Test public void sendTest() { String message = "Spring Boot TEST SUCCESS"; rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "cdw.test", message); } } 然后运行测试方法,查看RabbitMQ中是否有交换机,队列和消息
如果没有数据了,就使用生产者生产数据,然后在启动消费者
杜绝任何消息丢失或传递失败场景
RabbitMQ提供了两种方式用来控制消息的投递可靠性模式
confirm 确认模式
return 退回模式
消息从Producer到Exchange,则会返回一个confirmCallBack
package cn.cdw.producer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; //运行环境 @RunWith(SpringJUnit4ClassRunner.class) //加载配置文件 @ContextConfiguration(locations = "classpath:rabbitmq-producer.xml") public class ProducerTest { //注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; /* 确认模式 步骤: 确认模式的开启: ConnectionFactory中开启 在配置文件中配置 publisher-confirms="true" ,开启确认模式,它模式是false的 使用RabbitTemplate定义ConfirmCallBack回调函数 true 表示发送消息到交换机成功 false 表示发送消息到交换机失败 */ //编写一个测试方法 @Test public void test() { //使用RabbitTemplate定义ConfirmCallBack回调函数 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { /* confirm(CorrelationData correlationData, boolean ack, String cause) 参数详解: 参数1:correlationData 相关的配置信息 参数2: ack exchange交换机是否成收到消息。true 成功,false失败 参数3: cause 失败的原因 */ if (ack) { //exchange接收消息成功 System.out.println("exchange接收消息成功: " + cause); }else { //如果想测试它失败,我们只需要修改下面的发送消息的交换机的名称不存在的就可以,因为它没法发送消息到交换机,所以当然失败了 System.out.println("exchange接收消息失败: " + cause); //做一些处理,让消息再次发送 } System.out.println("confirm方法被执行了。。。。"); } }); //测试发送一条消息,上面confirm方法是否执行 String message = "Confirm TEST SUCCESS ,我已经发送了消息"; //使用RabbitTemplate给交换机发送一条消息 rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName", "confirm_routingKey", message); //rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName_test", "confirm_routingKey", message); } }当消息发送给Exchange后,Exchange路由到Queue失败时才会执行ReturnBack
/* 测试Return回退模式 */ @Test public void testReturn() { /* 当消息发送给Exchange后,Exchange路由到Queue失败时,才会执行ReturnCallBack 步骤 开启回退模式 在配置文件中设置开启 publisher-returns="true" 设置ReturnCallBack 设置Exchange处理消息的模式(两种模式) 如果消息没有路由到Queue,则丢弃消息(默认) 如果消息没有路由到Queue,返回消息发送方,就是ReturnCallBack */ //设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true);//设为true,它就不会执行默认的把消息丢弃了,而是将消息返回消息发送方 //使用RabbitTemplate定义ReturnCallBack回调函数 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //如果正常这么测试,这句话是不会执行到的,因为下面发送消息到Exchange到Queue都是正常的 //测试的话,我们可以把它路由的名称改为一个不存在的,达到Exchange发送不到Queue不就可以了, //但是是达到模式了,它还是没有打印出这句话 ,因为我们没有设置它的处理模式,默认它就把消息给丢弃了,所以也不会执行这个方法 //所以我们需要设置处理的模式 rabbitTemplate.setMandatory(true) /* returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 参数详解: 参数1: message 表示消息对象 参数2: replyCode 表示回复的错误码 参数3:replyText 表示回复的错误的信息 参数4:exchange 表示交换机 参数5:routingKey 表示路由键 */ //我们把这些参数都打印出来看看 System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); System.out.println("return方法被执行了。。。"); } }); //使用RabbitTemplate发送消息 String message = "Return TEST SUCCESS , 回退模式测试..."; // rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName", "confirm_routingKey", message); rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName", "confirm_routingKey_test", message); }ack指 Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
• 自动确认:acknowledge=“none”
• 手动确认:acknowledge=“manual”
• 根据异常情况确认:acknowledge=“auto”
创建
导入依赖
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>编写RabbitMQ的基本信息的配置文件rabbitmq.properties
rabbitmq.host=192.168.147.133 rabbitmq.port=5672 rabbitmq.username=cdw rabbitmq.password=cdw rabbitmq.virtual-host=/cdw编写消费端的配置文件consumer-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--=============================================================================================================--> <!--定义监听器bean,我创建一个包,专门放置监听器,然后我们去扫描这些包,就加载到bean了,这样我们就不需要定义多个bean了--> <!--然后我只需要在监听器上使用注解@Component标注为是一个组件,包扫描就可以扫描出来了--> <context:component-scan base-package="cn.cdw.consumer.listener"></context:component-scan> <!--定义监听器--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="ackListener" queue-names="test_confirm_queueName"></rabbit:listener> </rabbit:listener-container> </beans>编写监听器
package cn.cdw.consumer.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component; /* Consumer ACK机制 */ //标注为是一个组件 @Component public class AckListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(new String(message.getBody())); } }编写测试类,启动监听器监听MQ队列
package cn.cdw.consumer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /* 编写一个测试方法,让消费者一直运行着的,所以使用while(true) */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test() { while (true) { } } }测试,启动测试类,然后启动生产者生产数据
配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--=============================================================================================================--> <!--定义监听器bean,我创建一个包,专门放置监听器,然后我们去扫描这些包,就加载到bean了,这样我们就不需要定义多个bean了--> <!--然后我只需要在监听器上使用注解@Component标注为是一个组件,包扫描就可以扫描出来了--> <context:component-scan base-package="cn.cdw.consumer.listener"></context:component-scan> <!--定义监听器--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <!--自动签收监听器--> <!-- <rabbit:listener ref="ackListener" queue-names="test_confirm_queueName"></rabbit:listener> --> <!--手动签收监听器--> <rabbit:listener ref="ackListenerManual" queue-names="test_confirm_queueName"></rabbit:listener> </rabbit:listener-container> </beans>监听器
package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* Consumer ACK机制 设置手动签收: 在配置文件中配置 acknowledge="manual" 让监听器实现ChannelAwareMessageListener这个接口,重写onMessage方法 如果消息成功处理,则调用channel的basicAck()签收 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer */ @Component public class AckListenerManual implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); //获取deliveryTag Long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //接收转换消息 System.out.println(new String(message.getBody())); //处理业务逻辑 System.out.println("处理业务"); //我们在这里设置一个异常,测试处理业务出错了 int i = 1 / 0; //手动签收 /* basicReject(long deliveryTag, boolean multiple) 参数1:deliveryTag 获取deliveryTay Long deliveryTag = message.getMessageProperties().getDeliveryTag() 参数2: multiple 是否需要签收多条消息 */ channel.basicAck(deliveryTag, true); //这就是消息处理成功的正常签收,如果消息处理失败,我们应该拒绝签收,所以我们这里使用try{}catch(){}包裹起来 } catch (Exception e) { // e.printStackTrace(); //拒绝签收 /* basicNack(long deliveryTag, boolean multiple, boolean requeue) 参数1: deliveryTag 获取deliveryTay Long deliveryTag = message.getMessageProperties().getDeliveryTag() 参数2: multiple 是否需要签收多条消息 参数3:requeue 表示重回队列,如果设置为true,则消息重新回到queue,broker会重新发送给消息给消费端 */ channel.basicNack(deliveryTag, true, true); //还有一种方法,但是还是推荐使用上面这个方法 /* basicReject(long deliveryTag, boolean requeue) */ //channel.basicReject(deliveryTag, true); System.out.println("处理业务出错了,我要把消息返回给queue了,你重新再给我发消息吧。。"); } } }测试
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
编写配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--=============================================================================================================--> <!--定义监听器bean,我创建一个包,专门放置监听器,然后我们去扫描这些包,就加载到bean了,这样我们就不需要定义多个bean了--> <!--然后我只需要在监听器上使用注解@Component标注为是一个组件,包扫描就可以扫描出来了--> <context:component-scan base-package="cn.cdw.consumer.listener"></context:component-scan> <!--定义监听器--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <!--自动签收监听器--> <!-- <rabbit:listener ref="ackListener" queue-names="test_confirm_queueName"></rabbit:listener> --> <!--手动签收监听器--> <!-- <rabbit:listener ref="ackListenerManual" queue-names="test_confirm_queueName"></rabbit:listener>--> <!--消费端限流--> <rabbit:listener ref="qosListener" queue-names="test_confirm_queueName"></rabbit:listener> </rabbit:listener-container> </beans>限流监听器
package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* Consumer 限流机制 编写ack机制为ack手动确认 acknowledge="manual" listener-container配置属性 prefetch = 1 , 表示消费端每次从mq中拉取一条消息来消费,直到手动确认消费完毕,才会继续拉取下一条消息。 */ @Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Long deliveryTag = message.getMessageProperties().getDeliveryTag(); //获取消息 System.out.println(new String(message.getBody())); //处理业务逻辑 System.out.println("处理业务逻辑"); //签收,测试我先不手动签收(把它注释掉),我设置了消费端限流每次从mq中拉去一条消息,当消费端全部消费完后进行签收后,才会再去mq中拉去一条消息, //测试结果这里只会拉取一条消息,因为我这里我没有手动签收,所以它不会去拉取下一条消息, //channel.basicAck(deliveryTag, true); //还有不签收,其实这里使用try,catch包裹的,测试我就不写了 //我们在消费端写一个方法,使用循环发送多条数据到mq中进行消费端限流的测试 } }测试
小结
在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息
消费端的确认模式一定为手动确认。acknowledge=“manual”
TTL全称Time To Live(存活时间、过期时间)。当消息到达存活时间后,会被自动删除。RabbitMQ可以对消息设置过期时间,也可以对整个queue设置过期时间
编写配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--========================================================================================================================================--> <!--定义消息的可靠性传递(生产端)--> <!--name属性表示队列queue的名称--> <rabbit:queue id="test_queue_confirm" name="test_confirm_queueName"></rabbit:queue> <!--定义一个交换机--> <rabbit:direct-exchange name="test_exchange_confirm_exchangeName"> <!--将队列和交换机绑定--> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm_routingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--TTL Queue--> <rabbit:queue id="test_queue_ttl" name="test_queueName_ttl"> <rabbit:queue-arguments> <!--设置怎个队列的消息存活时间,这里我设置了10秒--> <!--注意,它的值是number类型的,所以我们需要指定它的value类型--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--TTL Exchange--> <rabbit:topic-exchange name="test_exchangeName_ttl"> <!--绑定queue--> <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> </beans>发送消息
/* TTL:过期时间 队列过期时间 */ @Test public void ttlTest() { String message = "TTL TEST SUCCESS,生产端已经发送了消息..."; for (int i = 1; i <= 10; i++) { rabbitTemplate.convertAndSend("test_exchangeName_ttl", "ttl.cdw", message); } }查看RabbitMQ是否10秒后将队列的索引消息的清除了
编码
/* TTL:过期时间 消息单独过期时间 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准 队列过期后,会将队列所有消息移除 消息过期后,只有在消息队列的顶端,才会判断其是否过期(移除掉) 就比如说,一共有5条消息,前面有我设置第3条消息5秒后过期,前面两条消息不过期,这时过5秒后,第3条消息是过期了,但是在队列中还是看到的(没有移除掉) 只有当它在消息队列的顶端,才会移除掉 */ @Test public void ttlTest2() { //消息处理对象MessagePostProcessor /* 设置一些消息的参数信息 设置message消息 返回该消息 */ MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置message信息 message.getMessageProperties().setExpiration("5000");//设置消息的过期时间,它的类型是字符串。我这里我设置了5秒过期 //返回该消息 return message; //然后把我们在外面单独声明的消息处理方法,添加到发送消息里边参数去 } }; String message = "TTL TEST SUCCESS,生产端已经发送了消息..."; rabbitTemplate.convertAndSend("test_exchangeName_ttl", "ttl.cdw", message, /* new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { return null; } } */ //上这样写不太直观,我们把消息处理对象单独声明出来,messagePostProcessor这个消息处理的参数,我们在上单独声明了处理方法了,这个把它传过来就可以刻 messagePostProcessor); }小结
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--========================================================================================================================================--> <!--定义消息的可靠性传递(生产端)--> <!--name属性表示队列queue的名称--> <rabbit:queue id="test_queue_confirm" name="test_confirm_queueName"></rabbit:queue> <!--定义一个交换机--> <rabbit:direct-exchange name="test_exchange_confirm_exchangeName"> <!--将队列和交换机绑定--> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm_routingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--TTL Queue--> <rabbit:queue id="test_queue_ttl" name="test_queueName_ttl"> <rabbit:queue-arguments> <!--设置怎个队列的消息存活时间,这里我设置了10秒--> <!--注意,它的值是number类型的,所以我们需要指定它的value类型--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--TTL Exchange--> <rabbit:topic-exchange name="test_exchangeName_ttl"> <!--绑定queue--> <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 死信队列 声明正常的队列(test_queueName_dlx)和交换机(test_exchangeName_dlx) 声明死信队列queueName_dlx)和死信交换机(exchangeName_dlx) 正常队列绑定死信交换机 设置两个参数: x-dead-letter-exchange 表示死信交换机名称 x-dead-letter-routing-key 发送给死信交换机的routingKey --> <!--声明正常的队列--> <rabbit:queue id="test_queue_dlx" name="test_queueName_dlx"> <!--设置正常队列绑定死信交换机--> <rabbit:queue-arguments> <!--x-dead-letter-exchange 表示死信交换机名称--> <entry key="x-dead-letter-exchange" value="exchangeName_dlx"></entry> <!--x-dead-letter-routing-key 发送给死信交换机的routingKey--> <entry key="x-dead-letter-routing-key" value="dlx.cdw"></entry> <!-- 怎么让一个消息变为死信,三种方式 设置队列过期时间 ttl 设置长度限制 max-length --> <!--设置队列过期时间--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> <!--设置长度限制,我这里设置这个队列最大能存储10条消息,当存储到第11条消息时,第11条消息就是死信消息了--> <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--定义正常交换机--> <rabbit:topic-exchange name="test_exchangeName_dlx"> <!--将正常的队列绑定到正常的交换机--> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--声明死信队列--> <rabbit:queue id="queue_dlx" name="queueName_dlx"></rabbit:queue> <!--声明死信交换机--> <rabbit:topic-exchange name="exchangeName_dlx"> <!--将死信队列绑定到死信交换机--> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> </beans>编写测试代码
/* 死信队列 过期时间 长度限制 消息拒收 */ //过期时间测试 @Test public void dlxTest() { //测试过期时间,死信消息 //发送消息到到正常交换机然后发送到正常队列,这里我设置了10秒这个队列过期,10秒后呢,它将成为死信消息,会发送到死信队列中 String message = "Dlx TEST SUCCESS ,我生产端已经发送消息了..."; rabbitTemplate.convertAndSend("test_exchangeName_dlx", "test.dlx.cdw", message); } //长度限制测试 @Test public void dlxTest2() { //我在配置文件中设置了这个队列只能存储10条消息,超过10条消息后,超过得消息将成为死信消息劝,会发送到死信队列,我在配置中配置有过10秒整个队列过期,10秒后所有的消息将都发送到死信队列 //这里我使用循环发送20条数据 for (int i = 1; i <= 20; i++) { String message = i + " :DLX TEST SUCCESS,我会成为死信吗?"; rabbitTemplate.convertAndSend("test_exchangeName_dlx", "test.dlx.cn", message); } } //消息的拒收 /* 我们需要编写消费端拒绝接收消息,当处理业务出现异常时,就需要设置拒绝签收 注意:消费者拒绝接收, 消息不重回队列 requeue=false,这样它才会进入死信队列 */ @Test public void dlxTest3() { String message = "DLX TEST SUCCESS,消费端发送消息了"; rabbitTemplate.convertAndSend("test_exchangeName_dlx", "test.dlx.cn", message); }消息拒签的消费端代码
配置文件
<!--测试死信队列,拒收消息监听器--> <rabbit:listener ref="dlxListener" queue-names="test_queueName_dlx"></rabbit:listener>拒收监听器代码
package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* 测试死信,让消费端拒绝签收 */ @Component public class DlxListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String(message.getBody())); //手动制作一个异常 int i=1/0; System.out.println("处理业务"); //签收 channel.basicAck(deliveryTag, true); } catch (Exception e) { // e.printStackTrace(); System.out.println("出现异常了,这里拒绝签收。。。"); //拒绝签收,注意,不重回队列 requeue=false; channel.basicNack(deliveryTag,true,false); } } }测试
死信交换机和死信队列和普通的没有区别当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列 如果没有绑定死信交换机,那么消息过期后就丢了 消息成为死信的三种情况:队列消息长度到达限制;
消费者拒接消费消息,并且不重回队列;
原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
注意:在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
提出需求:
下单后,30分钟未支付,取消订单,回滚库存。编写配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--========================================================================================================================================--> <!--定义消息的可靠性传递(生产端)--> <!--name属性表示队列queue的名称--> <rabbit:queue id="test_queue_confirm" name="test_confirm_queueName"></rabbit:queue> <!--定义一个交换机--> <rabbit:direct-exchange name="test_exchange_confirm_exchangeName"> <!--将队列和交换机绑定--> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm_routingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--TTL Queue--> <rabbit:queue id="test_queue_ttl" name="test_queueName_ttl"> <rabbit:queue-arguments> <!--设置怎个队列的消息存活时间,这里我设置了10秒--> <!--注意,它的值是number类型的,所以我们需要指定它的value类型--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--TTL Exchange--> <rabbit:topic-exchange name="test_exchangeName_ttl"> <!--绑定queue--> <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 死信队列 声明正常的队列(test_queueName_dlx)和交换机(test_exchangeName_dlx) 声明死信队列queueName_dlx)和死信交换机(exchangeName_dlx) 正常队列绑定死信交换机 设置两个参数: x-dead-letter-exchange 表示死信交换机名称 x-dead-letter-routing-key 发送给死信交换机的routingKey --> <!--声明正常的队列--> <rabbit:queue id="test_queue_dlx" name="test_queueName_dlx"> <!--设置正常队列绑定死信交换机--> <rabbit:queue-arguments> <!--x-dead-letter-exchange 表示死信交换机名称--> <entry key="x-dead-letter-exchange" value="exchangeName_dlx"></entry> <!--x-dead-letter-routing-key 发送给死信交换机的routingKey--> <entry key="x-dead-letter-routing-key" value="dlx.cdw"></entry> <!-- 怎么让一个消息变为死信,三种方式 设置队列过期时间 ttl 设置长度限制 max-length --> <!--设置队列过期时间--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> <!--设置长度限制,我这里设置这个队列最大能存储10条消息,当存储到第11条消息时,第11条消息就是死信消息了--> <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--定义正常交换机--> <rabbit:topic-exchange name="test_exchangeName_dlx"> <!--将正常的队列绑定到正常的交换机--> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--声明死信队列--> <rabbit:queue id="queue_dlx" name="queueName_dlx"></rabbit:queue> <!--声明死信交换机--> <rabbit:topic-exchange name="exchangeName_dlx"> <!--将死信队列绑定到死信交换机--> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 延迟队列(TTL+DLX组合实现) 定义正常交换机(order_exchangeName)和队列(order_queueName) 定义死信交换机(order_exchangeName_dlx)和队列(order_queueName_dlx) 绑定正常队列到死信交换机 设置两个参数: x-dead-letter-exchange 表示死信交换机名称 x-dead-letter-routing-key 发送给死信交换机的routingKey --> <!--定义正常queue--> <rabbit:queue id="order_queue" name="order_queueName"> <!--绑定死信交换机--> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="order_exchangeName_dlx"></entry> <entry key="x-dead-letter-routing-key" value="order.dlx.cancel"></entry> <!--设置队列过期时间,我们实现的需求是30分钟,但是我们为了测试,就写10秒测试了--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--定义正常的exchange--> <rabbit:topic-exchange name="order_exchangeName"> <!--绑定正常的队列--> <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--定义死信队列--> <rabbit:queue id="order_queue_dlx" name="order_queueName_dlx"></rabbit:queue> <!--定义死信交换机--> <rabbit:topic-exchange name="order_exchangeName_dlx"> <!--绑定死信队列--> <rabbit:bindings> <rabbit:binding pattern="order.dlx.#" queue="order_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> </beans>编写生产端
/* 队列延迟 注意,我们需要先把队列和交换机创建出来(随便执行一个测试方法发送一条消息即可),要不然后面我们先启动消费者没有找到队列和交换机就报错了 */ @Test public void delayTest()throws Exception { //模拟发送订单信息,将来是在订单系统中,下单成功后,发送消息 String message="订单信息: id=1,time=2019年12月30日19:41:56"; rabbitTemplate.convertAndSend("order_exchangeName", "order.msg", message); //打印倒计时10秒 for (int i = 10; i > 0; i--) { System.out.println(i); Thread.sleep(1000); } }编写消费端监听器
package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* 延迟队列的模拟订单过时测试 */ @Component public class OrderListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println(new String(message.getBody())); System.out.println("处理业务逻辑"); System.out.println("根据订单id查询其状态"); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存"); //签收 channel.basicAck(deliveryTag, true); }catch (Exception e){ // e.printStackTrace(); System.out.println("出现异常,拒绝签收"); //拒签 channel.basicNack(deliveryTag, true, true); } } }配置注册消费端监听器
<!-- 消费端订单监听器 延迟队列效果实现:一定要监听的是死信队列 注意:它应该监听的是这个死信的队列,因为这个死信队列有延迟的功能 如果监听正常的队列那么,消息一来就到了,没启动需求的作用 --> <rabbit:listener ref="orderListener" queue-names="order_queueName_dlx"></rabbit:listener>测试:启动消费端,再启动生产端,注意控制台打印,消费端需要等待10秒才会收到死信队列的消息,因为死信队列有延迟队列功能(TTL+DLX实现)
小结
延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。
RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
查看虚拟机命令: rabbitmqctl list_vhosts
查看连接命令: rabbitmqctl list_connections
查看exchanges:rabbitmqctl list_exchanges
查看消费者信息:rabbitmqctl list_consumers
查看环境变量:rabbitmqctl environment
查看未被确认的队列:rabbitmqctl list_queues name messages_unacknowledged
查看单个队列的内存使用:rabbitmqctl list_queues name memory
查看准备就绪的队列:rabbitmqctl list_queues name messages_ready
注意使用guest用户,虚拟机使用 / 默认的
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。维人员进行问题的定位。
将队列绑定到amq.rabbitmq.trace,路由key为#,所有的请求消息它都发送
rabbitmqctl trace_on:开启Firehose命令
注意:打开 trace 会影响消息写入功能,适当打开后请关闭
建议:在开发阶段我们可以开启消息追踪,在实际生产环境建议将其关闭
rabbitmqctl trace_off:关闭Firehose命令
我们向队列发送一条消息
我们发现当前消息也正常存在,并且开启消息追踪后,会多出一条消息是 amq.rabbitmq.trace 交换机发给当前队列的消息,消息中的内容是比较完整的
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
启用插件:rabbitmq-plugins enable rabbitmq_tracing
查看插件命令:
先输入命令:rabbitmq-plugins
列表查看: rabbitmq-plugins list
e* 表示启用的
建议:在开发阶段我们可以开启消息追踪插件,在实际生产环境不建议建议开启,除非是非常特殊的业务场景
关闭插件:rabbitmq-plugins disable rabbitmq_tracing
关闭Firehose:rabbitmqctl trace_off
