RabbitMQ之个人记录

    科技2022-08-18  96

    RabbitMQ

    RabbitMQ有六种不同的工作模式

    参考: https://www.rabbitmq.com/getstarted.html

    www.rabbitmq---->Get Started------------>RabbitMQ Tutorials

    使用简单模式完成消息传递

    创建工程(生产者、消费者)

    添加依赖

    给生产者和消费者配置maven坐标依赖

    <dependencies> <!--RabbitMQ java客户端依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies> <build> <plugins> <!--编译插件--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins>

    编写生产者发送消息

    package cn.cdw.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /* 消费者 */ public class Producer { public static void main(String[] args)throws Exception { //创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //设置访问ip connectionFactory.setHost("192.168.147.133"); //默认为localhost //设置访问端口号 connectionFactory.setPort(5672); //默认端口号为5672 //设置虚拟机 connectionFactory.setVirtualHost("/cdw");//虚拟机,默认值为: / //设置登录用户 connectionFactory.setUsername("cdw"); //用户名,默认为guest //密码 connectionFactory.setPassword("cdw"); //密码: 默认为guest //创建连接 Connection connection = connectionFactory.newConnection(); //创建管道 Channel channel = connection.createChannel(); //通过channel创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数详解: 参数1:queue 表示队列名称 参数2: durable 表示是否持久化,当RabbitMQ重启后数据还在 参数3: exclusive 表示是否独占,只能有一个消费者监听这队列 当connection关闭时,是否删除队列 参数4:autoDelete 表示是否自动删除,当没有consumer消费者时,自动删除 参数5: arguments 参数 */ //如果没有一个名字叫rabbitmqQueueName的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("rabbitmqQueueName", true, false, false, null); //通过channel发送消息 /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数详解: 参数1: exchange 表示交换机名称。简单模式下交换机使用默认的 参数2: routingKey 表示路由名称,如果使用默认的交换机,那么这个参数必须指定和队列名称一样 参数3: props 表示配置信息 参数4: body 表示消息数据 */ //body String body = "RabbitMQ Body TEST SUCCESS"; channel.basicPublish("", "rabbitmqQueueName", null, body.getBytes()); channel.close(); connection.close(); } }
    测试:

    运行,然后访问 http://192.168.147.133:15672/#/queues 如图

    编写消费者接收消息

    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ消费端 */ public class Consumer { public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //设置HOST,默认为localhost connectionFactory.setHost("192.168.147.133"); //设置PORT,默认为5672 connectionFactory.setPort(5672); //设置虚拟机,默认为 / connectionFactory.setVirtualHost("/cdw"); //设置username connectionFactory.setUsername("cdw"); //设置password connectionFactory.setPassword("cdw"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建管道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("rabbitmqQueueName", true, false, false, null); /* 回调对象的处理 回调方法,当收到消息后会自动执行该方法 handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 参数详解: 参数1:consumerTag 表示标识 参数2: envelope 表示可以用来获取一下信息,比如交换机的信息,路由的信息等 参数3: properties 表示配置的信息 参数4: body 表示数据 */ DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //我们把这些参数打印到控制台看一下 System.out.println("consumerTag: "+consumerTag); System.out.println("Exchange: "+envelope.getExchange()); System.out.println("RoutingKey: "+envelope.getRoutingKey()); System.out.println("properties: "+properties); System.out.println("body: " + body); } }; //接收消息 /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数详解: 参数1:queue 表示队列名称 参数2: autoAck 表示自动确认。就比如说消费者收到消息了,就跟RabbitMQ说一下,我收到消息了 参数3: callback 表示回调对象 */ channel.basicConsume("rabbitmqQueueName",true,consumer); //消费者是否关闭资源,当然是不关闭资源了,它应该要一直监听RabbitMQ消息, //不关闭资源,应该一直监听消息 channel.close(); connection.close(); } }
    测试:

    运行,看控制台的打印信息

    RabbitMQ的工作模式

    对于一些任务繁重,或任务较多的情况下使用工作模式可以提高任务处理的速度

    一个生产者,两个消费者演示

    生产者
    package cn.cdw.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /* RabbitMQ工作模式消费端 */ public class ProducerWorkQueue { public static void main(String[] args)throws Exception { //参数信息 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //queue channel.queueDeclare("workQueue", true, false, false, null); //发送消息 /* 因为RabbitMQ的工作模式,一个生产者两个消费者,所以我们使用循环生产多个数据,测试效果才看得出来 */ for (int i = 1; i <= 10; i++) { //消息 String body = i + "workQueue SUCCESS"; channel.basicPublish("", "workQueue", null, body.getBytes()); System.out.println(body); } //关闭资源 channel.close(); connection.close(); } }
    消费者1
    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ工作模式的消费端1 */ public class ConsumerWorkQueue1 { public static void main(String[] args)throws Exception { //设置信息 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //queue channel.queueDeclare("workQueue", true, false, false, null); //回调数据方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }; //接收消息 channel.basicConsume("workQueue", true, consumer); //不关闭资源,一直监听RabbitMQ消息 } }
    消费者2
    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ工作模式消费端2 */ public class ConsumerWorkQueue2 { public static void main(String[] args)throws Exception { //设置信息ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //queue channel.queueDeclare("workQueue", true, false, false, null); //回调处理方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }; //接收消息 channel.basicConsume("workQueue", true, consumer); //不关闭资源,一直监听RabbitMQ消息 } }
    测试

    先运行两消费者,消费者一直监听这RabbitMQ

    然后启动生产者,看看消费者的控制台发现它们分担了处理

    Pub/Sub 订阅模式

    交换机需要和队列进行绑定,绑定之后;一个消息可以被多个消费者收到

    生产者

    package cn.cdw.rabbitmq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import javax.management.Query; /* RabbitMQ的订阅模式 */ public class ProducerPusSub { public static void main(String[] args)throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //创建交换机 /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 参数详解 参数1:exchange 交换机名称 参数2:type 表示交换机的类型 DIRECT("direct"), 表示定向的方式 FANOUT("fanout"), 表示扇形(广播)的方式,意思就是当前这个交换机会把消息发送给与之绑定的所有队列 TOPIC("topic"), 表示通配符的方式 HEADERS("headers"); 表示参数匹配,用的非常的少 参数3:durable 是否持久化 参数4:autoDelete 是否自动删除 参数5:internal 一般设为false,内部使用 参数6:参数 */ //交换机名称 String exchangeName = "pubSubExchangeName"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); //创建队列 //定义两个队列名称 String queueName1 = "PubSubQueue1"; String queueName2 = "pubSubQueue2"; channel.queueDeclare(queueName1, true, false, false, null); channel.queueDeclare(queueName2, true, false, false, null); //绑定队列交换机 /* queueBind(String queue, String exchange, String routingKey) 参数详解 参数1: queue 表示队列的名称 参数2: exchange 表示交换机的名称 参数3:routingKey 表示路由键,绑定规则 如果交换机的类型为FANOUT广播类型,那么routingKey设置为"" 空 */ channel.queueBind(queueName1, exchangeName, ""); channel.queueBind(queueName2, exchangeName, ""); //发送消息 //body String body = "PubSub TEST SUCCESS"; channel.basicPublish("pubSubExchangeName", "", null, body.getBytes()); //关闭资源 channel.close(); connection.close(); } }

    消费者1

    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ订阅模式消费者1 */ public class ConsumerPubSub1 { public static void main(String[] args)throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //创建Exchange String exchangeName = "pubSubExchangeName"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); //创建queue String queueName1 = "PubSubQueue1"; //String queueName2 = "pubSubQueue2"; channel.queueDeclare(queueName1, true, false, false, null); //channel.queueDeclare(queueName1, true, false, false, null); //绑定交换机 channel.queueBind(queueName1, exchangeName, ""); //channel.queueBind(queueName2, exchangeName, ""); //接收消息 //回调方法的处理 DefaultConsumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); System.out.println("队列1,我让它执行findAll方法"); } }; channel.basicConsume(queueName1, true, consumer); } }

    消费者2

    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; //RabbitMQ订阅模式消费者2 public class ConsumerPubSub2 { public static void main(String[] args)throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //Connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //Exchange String exchangeName = "pubSubExchangeName"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); //queue String queueName2 = "PubSubQueue2"; channel.queueDeclare(queueName2, true, false, false, null); //绑定交换机 channel.queueBind(queueName2, exchangeName, ""); //接收消息 //处理回调对象方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); System.out.println("我让这个队列把数据存储到数据库中"); } }; //接收消息 channel.basicConsume(queueName2, true, consumer); } }
    测试

    启动所有的消费者

    然后使用生产者发送消息,在每个消费者对应的控制台可以查看到生产者发送的所有消息

    Routing 路由模式

    Routing路由模式要求队列在绑定交换机是要指定routing key,消息会转发的符合的routing key的队列

    生产者

    package cn.cdw.rabbitmq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /* RabbitMQ路由模式生产者 */ public class ProducerRouting { public static void main(String[] args) throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置数据 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //Connection Connection connection = connectionFactory.newConnection(); //Channel Channel channel = connection.createChannel(); //Exchange , 注意:Exchange类型必须为Direct定向类型 String exchangeName = "RoutingExchange"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null); //queue String queueName1 = "RoutingQueue1"; String queueName2 = "RoutingQueue2"; channel.queueDeclare(queueName1, true, false, false, null); channel.queueDeclare(queueName2, true, false, false, null); //绑定Exchange,测试队列1保存异常信息到数据看,所以定义它的路由名称为error //然后队列2 保存info warning error在控制台显示,所以队列2的路由名称为info,warning,error channel.queueBind(queueName1, exchangeName, "error"); channel.queueBind(queueName2, exchangeName, "info"); channel.queueBind(queueName2, exchangeName, "warning"); channel.queueBind(queueName2, exchangeName, "error"); //发送消息 //队列1的消息: String body1 = "Routing TEST SUCCESS,把异常的信息保存到数据库ERROR"; //队列2的消息 String body2 = "Routing TEST SUCCESS,把info,warning,error的信息显示在控制台INFO,WARNING,ERROR"; channel.basicPublish(exchangeName, "error", null, body1.getBytes()); channel.basicPublish(exchangeName, "info", null, body2.getBytes()); channel.basicPublish(exchangeName, "warning", null, body2.getBytes()); channel.basicPublish(exchangeName, "error", null, body2.getBytes()); //关闭资源 channel.close(); connection.close(); } }

    消费者1

    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ的路由模式的消费者1 */ public class ConsumerRouting1 { public static void main(String[] args)throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //Connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //Exchange,路由模式的交换机类型必须为Direct String exchangeName = "RoutingExchange"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null); //queue String queueName1 = "RoutingQueue1"; channel.queueDeclare(queueName1, true, false, false, null); //绑定Exchange channel.queueBind(queueName1, exchangeName, "error"); //接收方法 //回调对象的处理方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); System.out.println("测试我要把这个队列1的异常信息存储到数据库,ERROR"); } }; //接收 channel.basicConsume(queueName1, true, consumer); } }

    消费者2

    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ的路由模式的消费者2 */ public class ConsumerRouting2 { public static void main(String[] args)throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //Connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //Exchange String exchangeName = "RoutingExchange"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null); //queue String queueName2 = "RoutingQueue2"; channel.queueDeclare(queueName2, true, false, false, null); //绑定Exchange channel.queueBind(queueName2, exchangeName, "error"); channel.queueBind(queueName2, exchangeName, "info"); channel.queueBind(queueName2, exchangeName, "warning"); //接收消息,回调对象的处理方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); System.out.println("测试我们将队列2把INFO,WARNING,ERROR的异常在控制台显示 INFO,WARNING,ERROR"); } }; //接收 channel.basicConsume(queueName2, true, consumer); } }

    测试

    启动所有消费者,然后使用生产者发送消息

    查看控制台的打印信息

    Topics通配符模式

    Topic通配符模式可以实现PusSub发布与订阅模式和Routing路由模式

    Topic通配符模式在路由名称上使用通配符,显得更加灵活

    使用通配符:

    *:表示一个词

    例如:cdw.*

    cdw.test1 可以匹配到

    cdw.test1.test2 这不止一个词了,所以匹配不到

    #:表示至少一个词和多个词

    例如:cdw.#

    dw.test1 可以匹配到

    cdw.test1.test2 这不止一个词,所以也可以匹配到

    cdw.# : 表示以cdw开头的都能匹配到

    #.cdw:表示以cdw结尾的都能匹配到

    生产者

    package cn.cdw.rabbitmq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /* RabbitMQ通配符模式 */ public class ProducerTopic { public static void main(String[] args)throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //Connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //Exchange , Exchange类型必须指定Topic String exchangeName = "TopicExchange"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); //注意,这里我没有定义queue和绑定Exchange,我们在消费端会定义队列和绑定,所以这里不用写也是可以的 //发送消息 String body1 = "Topic TEST SUCCESS 第一条消息 ERROR"; String body2 = "Topic TEST SUCCESS 第二条消息 ERROR,WARNING,INFO"; channel.basicPublish(exchangeName, "cdw.error", null, body1.getBytes()); channel.basicPublish(exchangeName, "cdw.cn.error", null, body2.getBytes()); channel.basicPublish(exchangeName, "cdw.cn.warning", null, body2.getBytes()); channel.basicPublish(exchangeName, "cdw.cn.info", null, body2.getBytes()); //关闭资源 channel.close(); connection.close(); } }

    消费者1

    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ匹配模式消费者1 */ public class ConsumerTopic1 { public static void main(String[] args)throws Exception { //ConnectionFactory ConnectionFactory connectionFactory=new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //Connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //exchange String exchangeName = "TopicExchange"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); //queue String queueName1 = "TopicQueue1"; channel.queueDeclare(queueName1, true, false, false, null); //绑定交换机 channel.queueBind(queueName1, exchangeName, "cdw.*"); //表示匹配cdw.后只有一个词的,例如cdw.error //接收消息 //回调对象的处理方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "utf-8")); System.out.println("这是一个异常信息,我要把它存入到数据库"); } }; //接收 channel.basicConsume(queueName1, true, consumer); } }

    消费者2

    package cn.cdw.consumer; import com.rabbitmq.client.*; import java.io.IOException; /* RabbitMQ匹配模式消费者2 */ public class ConsumerTopic2 { public static void main(String[] args) throws Exception{ //ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); //设置信息 connectionFactory.setHost("192.168.147.133"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cdw"); connectionFactory.setUsername("cdw"); connectionFactory.setPassword("cdw"); //Connection Connection connection = connectionFactory.newConnection(); //channel Channel channel = connection.createChannel(); //Exchange String exchangeName = "TopicExchange"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); //queue String queueName2 = "TopicQueue2"; channel.queueDeclare(queueName2, true, false, false, null); //绑定Exchange channel.queueBind(queueName2, exchangeName, "cdw.#");//表示陪陪cdw.至少一个词和多个词的,例如,cdw.cn.error,cdw.cn.info,chw.cn.warning //接收消息 //回调对象的处理方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "utf-8")); System.out.println("这个队列2,我让它把ERROR,INFO,WARNING信息在控制台显示"); } }; //接收 channel.basicConsume(queueName2, true, consumer); } }

    测试

    以上RabbitMQ的五种模式 总结

    简单模式(HelloWorld)

    一个生产者,一个消费者,不需要设置交换机(使用默认交换机)

    工作队列模式(Work Queue)

    一个生产者,多个消费者(竞争关系),不需要设置 交换机(使用默认交换机)

    发布订阅模式(Publish/Subscribe)

    需要设置类型为FANOUT的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机将消息发送到绑定的队列

    路由模式 (Routing)

    需要设置类型为DIRECT的交换机,交换机和队列进行绑定,并且知道routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

    通配符模式(Topic)

    需要设置类型为TOPIC的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机很根据routing key将消息发送到对应的队列

    Spring整合RabbitMQ

    创建生产者

    新建生产者和消费者工程

    配置生产者和消费者引入maven坐标

    <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>

    编写配置文件RabbitMQ的连接参数(rabbitmq.properties),生产者和消费者都需要配置

    rabbitmq.host=192.168.147.133 rabbitmq.port=5672 rabbitmq.username=cdw rabbitmq.password=cdw rabbitmq.virtual-host=/cdw

    生产者的配置文件(spring-rabbitmq-producer.xml)

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机 默认交换机类型为direct,名字为:"",路由键为队列的名称 --> <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/> <!--定义广播类型交换机;并绑定上述两个队列--> <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_fanout_queue_1"/> <rabbit:binding queue="spring_fanout_queue_2"/> </rabbit:bindings> </rabbit:fanout-exchange> <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/> <!--定义广播交换机中的持久化队列,不存在则自动创建--> <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/> <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding pattern="cdw.*" queue="spring_topic_queue_star"/> <rabbit:binding pattern="cdw.#" queue="spring_topic_queue_well"/> <rabbit:binding pattern="cn.#" queue="spring_topic_queue_well2"/> </rabbit:bindings> </rabbit:topic-exchange> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> </beans>

    发送消息测试

    package cn.cdw.rabbitmq.producer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { //注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; /* 测试发送消息 */ @Test public void QueueTest() { rabbitTemplate.convertAndSend("spring_queue", "RabbitMQ的简单模式,只发送队列spring_queue的消息"); } /* 发送广播 参数1:交换机的名称 参数2:路由名称,广播的话,路由名称为空 参数3:数据内容 */ @Test public void fanoutTest() { rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送的广播消息,发送到spring_fanout_exchange交换机广播消息"); } /* 通配符topic发送消息 参数1:交换机名称 参数2:路由键名 参数3:发送消息内容 */ @Test public void topicTest() { rabbitTemplate.convertAndSend("spring_topic_exchange","cdw.test","使用通配符发送的消息,发送到spring_topic_exchange交换机的cdw.test"); rabbitTemplate.convertAndSend("spring_topic_exchange","cdw.test.cn","使用通配符发送的消息,发送到spring_topic_exchange交换机的cdw.test.cn"); rabbitTemplate.convertAndSend("spring_topic_exchange","cn.test.cdw","使用通配符发送的消息,发送到spring_topic_exchange交换机的cn.test.cdw"); } }

    创建消费者工程

    导入maven坐标

    创建配置文件

    rabbitmq.propreties
    rabbitmq.host=192.168.147.133 rabbitmq.port=5672 rabbitmq.username=cdw rabbitmq.password=cdw rabbitmq.virtual-host=/cdw
    spring-rabbitmq-consumer.xml

    编写代码(必须实现MessageListener)

    和配置文件的要对上

    然后重新onMessage方法,然后直接打印就可以

    package cn.cdw.rabbitmq.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class TopicListenerWell2 implements MessageListener { @Override public void onMessage(Message message) { System.out.println(new String(message.getBody())); } }

    然后写有一个测试方法,让它一直不停

    package cn.cdw.rabbitmq.listener; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class QueueTest { @Test public void queueTest() { boolean flag=true; while (true) { } } }

    Spring Boot整合RabbitMQ

    生产者:

    创建springboot工程

    新建一个maven工程

    引入依赖坐标

    <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <!--RabbitMQ的启动器依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--测试环境依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency>

    编写yaml配置,基本信息配置

    #配置RabbitMQ的基本信息 ip,端口,虚拟机,用户名,密码 spring: rabbitmq: host: 192.168.147.133 port: 5672 virtual-host: /cdw username: cdw password: cdw

    定义交换机,队列及绑定关系的配置类

    package cn.cdw.producer.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* RabbitMQ配置类 */ @Configuration public class RabbitMQConfig { //交换机名称 public static final String topicExchangeName = "SpringBootTopicExchange"; //队列名称 public static final String queueName = "SpringBootQueue"; //声明交换机进行注册到Bean中 @Bean("topicExchange") public Exchange topicExchange() { return ExchangeBuilder.topicExchange(topicExchangeName).durable(true).build(); } //声明队列注册到Bean中 @Bean("queue") public Queue queue() { return QueueBuilder.durable(queueName).build(); } //绑定队列和交换机 @Bean public Binding queueExchange(@Qualifier("queue") Queue queue, @Qualifier("topicExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("cdw.#").noargs(); } }
    然后编写一个测试类

    注入RabbitmqTemplate,调用方法,完成消息发送

    package cn.cdw.producer; import cn.cdw.producer.config.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest @RunWith(SpringRunner.class) public class RabbitMQConfigTest { //注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; //编写一个测试方法发送消息 @Test public void sendTest() { String message = "Spring Boot TEST SUCCESS"; rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "cdw.test", message); } }

    ​ 然后运行测试方法,查看RabbitMQ中是否有交换机,队列和消息

    消费端编写

    创建消费者工程

    导入依赖坐标

    <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <!--RabbitMQ的启动器依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--测试环境依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>

    编写启动器类和yml配置文件

    启动器类
    package cn.cdw.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
    yml配置文件
    #配置RabbitMQ的基本信息 ip,端口,虚拟机,用户名,密码 spring: rabbitmq: host: 192.168.147.133 port: 5672 virtual-host: /cdw username: cdw password: cdw

    定义监听器,使用@RabbitListener注解完成队列监听

    package cn.cdw.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MyListener { /* 编写一个方法,使用注解@RabbitListener标准监听的队列 */ @RabbitListener(queues = "SpringBootQueue") public void myListener(String message) { System.out.println(message); } }

    然后运行消费者的启动器类

    如果没有数据了,就使用生产者生产数据,然后在启动消费者

    RabbitMQ的高级特性

    消息可靠性传递

    杜绝任何消息丢失或传递失败场景

    RabbitMQ提供了两种方式用来控制消息的投递可靠性模式

    confirm 确认模式

    return 退回模式

    实现

    创建消费者和生产者工程
    给生产者和消费者导入依赖坐标
    <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
    生产者和消费者的RabbitMQ的基本配置信息
    rabbitmq.properties
    rabbitmq.host=192.168.147.133 rabbitmq.port=5672 rabbitmq.username=cdw rabbitmq.password=cdw rabbitmq.virtual-host=/cdw
    编写生产者配置文件
    rabbitmq-producer.xml
    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--========================================================================================================================================--> <!--定义消息的可靠性传递(生产端)--> <!--name属性表示队列queue的名称--> <rabbit:queue id="test_queue_confirm" name="test_confirm_queueName"></rabbit:queue> <!--定义一个交换机--> <rabbit:direct-exchange name="test_exchange_confirm_exchangeName"> <!--将队列和交换机绑定--> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm_routingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> </beans>
    编写测试类
    Confirm确认模式代码实现

    消息从Producer到Exchange,则会返回一个confirmCallBack

    package cn.cdw.producer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; //运行环境 @RunWith(SpringJUnit4ClassRunner.class) //加载配置文件 @ContextConfiguration(locations = "classpath:rabbitmq-producer.xml") public class ProducerTest { //注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; /* 确认模式 步骤: 确认模式的开启: ConnectionFactory中开启 在配置文件中配置 publisher-confirms="true" ,开启确认模式,它模式是false的 使用RabbitTemplate定义ConfirmCallBack回调函数 true 表示发送消息到交换机成功 false 表示发送消息到交换机失败 */ //编写一个测试方法 @Test public void test() { //使用RabbitTemplate定义ConfirmCallBack回调函数 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { /* confirm(CorrelationData correlationData, boolean ack, String cause) 参数详解: 参数1:correlationData 相关的配置信息 参数2: ack exchange交换机是否成收到消息。true 成功,false失败 参数3: cause 失败的原因 */ if (ack) { //exchange接收消息成功 System.out.println("exchange接收消息成功: " + cause); }else { //如果想测试它失败,我们只需要修改下面的发送消息的交换机的名称不存在的就可以,因为它没法发送消息到交换机,所以当然失败了 System.out.println("exchange接收消息失败: " + cause); //做一些处理,让消息再次发送 } System.out.println("confirm方法被执行了。。。。"); } }); //测试发送一条消息,上面confirm方法是否执行 String message = "Confirm TEST SUCCESS ,我已经发送了消息"; //使用RabbitTemplate给交换机发送一条消息 rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName", "confirm_routingKey", message); //rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName_test", "confirm_routingKey", message); } }
    Return退回模式代码实现

    当消息发送给Exchange后,Exchange路由到Queue失败时才会执行ReturnBack

    /* 测试Return回退模式 */ @Test public void testReturn() { /* 当消息发送给Exchange后,Exchange路由到Queue失败时,才会执行ReturnCallBack 步骤 开启回退模式 在配置文件中设置开启 publisher-returns="true" 设置ReturnCallBack 设置Exchange处理消息的模式(两种模式) 如果消息没有路由到Queue,则丢弃消息(默认) 如果消息没有路由到Queue,返回消息发送方,就是ReturnCallBack */ //设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true);//设为true,它就不会执行默认的把消息丢弃了,而是将消息返回消息发送方 //使用RabbitTemplate定义ReturnCallBack回调函数 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //如果正常这么测试,这句话是不会执行到的,因为下面发送消息到Exchange到Queue都是正常的 //测试的话,我们可以把它路由的名称改为一个不存在的,达到Exchange发送不到Queue不就可以了, //但是是达到模式了,它还是没有打印出这句话 ,因为我们没有设置它的处理模式,默认它就把消息给丢弃了,所以也不会执行这个方法 //所以我们需要设置处理的模式 rabbitTemplate.setMandatory(true) /* returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 参数详解: 参数1: message 表示消息对象 参数2: replyCode 表示回复的错误码 参数3:replyText 表示回复的错误的信息 参数4:exchange 表示交换机 参数5:routingKey 表示路由键 */ //我们把这些参数都打印出来看看 System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); System.out.println("return方法被执行了。。。"); } }); //使用RabbitTemplate发送消息 String message = "Return TEST SUCCESS , 回退模式测试..."; // rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName", "confirm_routingKey", message); rabbitTemplate.convertAndSend("test_exchange_confirm_exchangeName", "confirm_routingKey_test", message); }

    Consumer ACK

    ack指 Acknowledge,确认。 表示消费端收到消息后的确认方式。

    有三种确认方式:

    • 自动确认:acknowledge=“none”

    • 手动确认:acknowledge=“manual”

    • 根据异常情况确认:acknowledge=“auto”

    创建消费端

    创建

    导入依赖

    <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>

    编写RabbitMQ的基本信息的配置文件rabbitmq.properties

    rabbitmq.host=192.168.147.133 rabbitmq.port=5672 rabbitmq.username=cdw rabbitmq.password=cdw rabbitmq.virtual-host=/cdw

    自动签收

    编写消费端的配置文件consumer-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--=============================================================================================================--> <!--定义监听器bean,我创建一个包,专门放置监听器,然后我们去扫描这些包,就加载到bean了,这样我们就不需要定义多个bean了--> <!--然后我只需要在监听器上使用注解@Component标注为是一个组件,包扫描就可以扫描出来了--> <context:component-scan base-package="cn.cdw.consumer.listener"></context:component-scan> <!--定义监听器--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="ackListener" queue-names="test_confirm_queueName"></rabbit:listener> </rabbit:listener-container> </beans>

    编写监听器

    package cn.cdw.consumer.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component; /* Consumer ACK机制 */ //标注为是一个组件 @Component public class AckListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(new String(message.getBody())); } }

    编写测试类,启动监听器监听MQ队列

    package cn.cdw.consumer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /* 编写一个测试方法,让消费者一直运行着的,所以使用while(true) */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test() { while (true) { } } }

    测试,启动测试类,然后启动生产者生产数据

    手动签收

    配置文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--=============================================================================================================--> <!--定义监听器bean,我创建一个包,专门放置监听器,然后我们去扫描这些包,就加载到bean了,这样我们就不需要定义多个bean了--> <!--然后我只需要在监听器上使用注解@Component标注为是一个组件,包扫描就可以扫描出来了--> <context:component-scan base-package="cn.cdw.consumer.listener"></context:component-scan> <!--定义监听器--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <!--自动签收监听器--> <!-- <rabbit:listener ref="ackListener" queue-names="test_confirm_queueName"></rabbit:listener> --> <!--手动签收监听器--> <rabbit:listener ref="ackListenerManual" queue-names="test_confirm_queueName"></rabbit:listener> </rabbit:listener-container> </beans>

    监听器

    package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* Consumer ACK机制 设置手动签收: 在配置文件中配置 acknowledge="manual" 让监听器实现ChannelAwareMessageListener这个接口,重写onMessage方法 如果消息成功处理,则调用channel的basicAck()签收 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer */ @Component public class AckListenerManual implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); //获取deliveryTag Long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //接收转换消息 System.out.println(new String(message.getBody())); //处理业务逻辑 System.out.println("处理业务"); //我们在这里设置一个异常,测试处理业务出错了 int i = 1 / 0; //手动签收 /* basicReject(long deliveryTag, boolean multiple) 参数1:deliveryTag 获取deliveryTay Long deliveryTag = message.getMessageProperties().getDeliveryTag() 参数2: multiple 是否需要签收多条消息 */ channel.basicAck(deliveryTag, true); //这就是消息处理成功的正常签收,如果消息处理失败,我们应该拒绝签收,所以我们这里使用try{}catch(){}包裹起来 } catch (Exception e) { // e.printStackTrace(); //拒绝签收 /* basicNack(long deliveryTag, boolean multiple, boolean requeue) 参数1: deliveryTag 获取deliveryTay Long deliveryTag = message.getMessageProperties().getDeliveryTag() 参数2: multiple 是否需要签收多条消息 参数3:requeue 表示重回队列,如果设置为true,则消息重新回到queue,broker会重新发送给消息给消费端 */ channel.basicNack(deliveryTag, true, true); //还有一种方法,但是还是推荐使用上面这个方法 /* basicReject(long deliveryTag, boolean requeue) */ //channel.basicReject(deliveryTag, true); System.out.println("处理业务出错了,我要把消息返回给queue了,你重新再给我发消息吧。。"); } } }

    测试

    总结

    在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

    如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

    如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

    消费端限流

    编写配置文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <!--=============================================================================================================--> <!--定义监听器bean,我创建一个包,专门放置监听器,然后我们去扫描这些包,就加载到bean了,这样我们就不需要定义多个bean了--> <!--然后我只需要在监听器上使用注解@Component标注为是一个组件,包扫描就可以扫描出来了--> <context:component-scan base-package="cn.cdw.consumer.listener"></context:component-scan> <!--定义监听器--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <!--自动签收监听器--> <!-- <rabbit:listener ref="ackListener" queue-names="test_confirm_queueName"></rabbit:listener> --> <!--手动签收监听器--> <!-- <rabbit:listener ref="ackListenerManual" queue-names="test_confirm_queueName"></rabbit:listener>--> <!--消费端限流--> <rabbit:listener ref="qosListener" queue-names="test_confirm_queueName"></rabbit:listener> </rabbit:listener-container> </beans>

    限流监听器

    package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* Consumer 限流机制 编写ack机制为ack手动确认 acknowledge="manual" listener-container配置属性 prefetch = 1 , 表示消费端每次从mq中拉取一条消息来消费,直到手动确认消费完毕,才会继续拉取下一条消息。 */ @Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Long deliveryTag = message.getMessageProperties().getDeliveryTag(); //获取消息 System.out.println(new String(message.getBody())); //处理业务逻辑 System.out.println("处理业务逻辑"); //签收,测试我先不手动签收(把它注释掉),我设置了消费端限流每次从mq中拉去一条消息,当消费端全部消费完后进行签收后,才会再去mq中拉去一条消息, //测试结果这里只会拉取一条消息,因为我这里我没有手动签收,所以它不会去拉取下一条消息, //channel.basicAck(deliveryTag, true); //还有不签收,其实这里使用try,catch包裹的,测试我就不写了 //我们在消费端写一个方法,使用循环发送多条数据到mq中进行消费端限流的测试 } }

    测试

    小结

    在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息

    消费端的确认模式一定为手动确认。acknowledge=“manual”

    TTL(主要是生产端)

    TTL全称Time To Live(存活时间、过期时间)。当消息到达存活时间后,会被自动删除。RabbitMQ可以对消息设置过期时间,也可以对整个queue设置过期时间

    在RabbitMQ管理控制台设置过期时间

    代码实现设置过期时间

    队列过期时间实现

    编写配置文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--========================================================================================================================================--> <!--定义消息的可靠性传递(生产端)--> <!--name属性表示队列queue的名称--> <rabbit:queue id="test_queue_confirm" name="test_confirm_queueName"></rabbit:queue> <!--定义一个交换机--> <rabbit:direct-exchange name="test_exchange_confirm_exchangeName"> <!--将队列和交换机绑定--> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm_routingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--TTL Queue--> <rabbit:queue id="test_queue_ttl" name="test_queueName_ttl"> <rabbit:queue-arguments> <!--设置怎个队列的消息存活时间,这里我设置了10秒--> <!--注意,它的值是number类型的,所以我们需要指定它的value类型--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--TTL Exchange--> <rabbit:topic-exchange name="test_exchangeName_ttl"> <!--绑定queue--> <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> </beans>

    发送消息

    /* TTL:过期时间 队列过期时间 */ @Test public void ttlTest() { String message = "TTL TEST SUCCESS,生产端已经发送了消息..."; for (int i = 1; i <= 10; i++) { rabbitTemplate.convertAndSend("test_exchangeName_ttl", "ttl.cdw", message); } }

    查看RabbitMQ是否10秒后将队列的索引消息的清除了

    消息的单独过期

    编码

    /* TTL:过期时间 消息单独过期时间 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准 队列过期后,会将队列所有消息移除 消息过期后,只有在消息队列的顶端,才会判断其是否过期(移除掉) 就比如说,一共有5条消息,前面有我设置第3条消息5秒后过期,前面两条消息不过期,这时过5秒后,第3条消息是过期了,但是在队列中还是看到的(没有移除掉) 只有当它在消息队列的顶端,才会移除掉 */ @Test public void ttlTest2() { //消息处理对象MessagePostProcessor /* 设置一些消息的参数信息 设置message消息 返回该消息 */ MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置message信息 message.getMessageProperties().setExpiration("5000");//设置消息的过期时间,它的类型是字符串。我这里我设置了5秒过期 //返回该消息 return message; //然后把我们在外面单独声明的消息处理方法,添加到发送消息里边参数去 } }; String message = "TTL TEST SUCCESS,生产端已经发送了消息..."; rabbitTemplate.convertAndSend("test_exchangeName_ttl", "ttl.cdw", message, /* new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { return null; } } */ //上这样写不太直观,我们把消息处理对象单独声明出来,messagePostProcessor这个消息处理的参数,我们在上单独声明了处理方法了,这个把它传过来就可以刻 messagePostProcessor); }

    小结

    设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

    设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

    如果两者都进行了设置,以时间短的为准。

    死信队列(DLX)

    死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    消息成为死信的三种情况

    队列消息长度到达限制; 就比如说,我设置了这个队列只能存储10条消息,长度为10,当第11条消息过来的时候,这个第11条消息就成为了死信 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;原队列存在消息过期设置,消息到达超时时间未被消费
    队列绑定死信交换机

    给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

    配置文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--========================================================================================================================================--> <!--定义消息的可靠性传递(生产端)--> <!--name属性表示队列queue的名称--> <rabbit:queue id="test_queue_confirm" name="test_confirm_queueName"></rabbit:queue> <!--定义一个交换机--> <rabbit:direct-exchange name="test_exchange_confirm_exchangeName"> <!--将队列和交换机绑定--> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm_routingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--TTL Queue--> <rabbit:queue id="test_queue_ttl" name="test_queueName_ttl"> <rabbit:queue-arguments> <!--设置怎个队列的消息存活时间,这里我设置了10秒--> <!--注意,它的值是number类型的,所以我们需要指定它的value类型--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--TTL Exchange--> <rabbit:topic-exchange name="test_exchangeName_ttl"> <!--绑定queue--> <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 死信队列 声明正常的队列(test_queueName_dlx)和交换机(test_exchangeName_dlx) 声明死信队列queueName_dlx)和死信交换机(exchangeName_dlx) 正常队列绑定死信交换机 设置两个参数: x-dead-letter-exchange 表示死信交换机名称 x-dead-letter-routing-key 发送给死信交换机的routingKey --> <!--声明正常的队列--> <rabbit:queue id="test_queue_dlx" name="test_queueName_dlx"> <!--设置正常队列绑定死信交换机--> <rabbit:queue-arguments> <!--x-dead-letter-exchange 表示死信交换机名称--> <entry key="x-dead-letter-exchange" value="exchangeName_dlx"></entry> <!--x-dead-letter-routing-key 发送给死信交换机的routingKey--> <entry key="x-dead-letter-routing-key" value="dlx.cdw"></entry> <!-- 怎么让一个消息变为死信,三种方式 设置队列过期时间 ttl 设置长度限制 max-length --> <!--设置队列过期时间--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> <!--设置长度限制,我这里设置这个队列最大能存储10条消息,当存储到第11条消息时,第11条消息就是死信消息了--> <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--定义正常交换机--> <rabbit:topic-exchange name="test_exchangeName_dlx"> <!--将正常的队列绑定到正常的交换机--> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--声明死信队列--> <rabbit:queue id="queue_dlx" name="queueName_dlx"></rabbit:queue> <!--声明死信交换机--> <rabbit:topic-exchange name="exchangeName_dlx"> <!--将死信队列绑定到死信交换机--> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> </beans>

    编写测试代码

    /* 死信队列 过期时间 长度限制 消息拒收 */ //过期时间测试 @Test public void dlxTest() { //测试过期时间,死信消息 //发送消息到到正常交换机然后发送到正常队列,这里我设置了10秒这个队列过期,10秒后呢,它将成为死信消息,会发送到死信队列中 String message = "Dlx TEST SUCCESS ,我生产端已经发送消息了..."; rabbitTemplate.convertAndSend("test_exchangeName_dlx", "test.dlx.cdw", message); } //长度限制测试 @Test public void dlxTest2() { //我在配置文件中设置了这个队列只能存储10条消息,超过10条消息后,超过得消息将成为死信消息劝,会发送到死信队列,我在配置中配置有过10秒整个队列过期,10秒后所有的消息将都发送到死信队列 //这里我使用循环发送20条数据 for (int i = 1; i <= 20; i++) { String message = i + " :DLX TEST SUCCESS,我会成为死信吗?"; rabbitTemplate.convertAndSend("test_exchangeName_dlx", "test.dlx.cn", message); } } //消息的拒收 /* 我们需要编写消费端拒绝接收消息,当处理业务出现异常时,就需要设置拒绝签收 注意:消费者拒绝接收, 消息不重回队列 requeue=false,这样它才会进入死信队列 */ @Test public void dlxTest3() { String message = "DLX TEST SUCCESS,消费端发送消息了"; rabbitTemplate.convertAndSend("test_exchangeName_dlx", "test.dlx.cn", message); }

    消息拒签的消费端代码

    配置文件

    <!--测试死信队列,拒收消息监听器--> <rabbit:listener ref="dlxListener" queue-names="test_queueName_dlx"></rabbit:listener>

    拒收监听器代码

    package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* 测试死信,让消费端拒绝签收 */ @Component public class DlxListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String(message.getBody())); //手动制作一个异常 int i=1/0; System.out.println("处理业务"); //签收 channel.basicAck(deliveryTag, true); } catch (Exception e) { // e.printStackTrace(); System.out.println("出现异常了,这里拒绝签收。。。"); //拒绝签收,注意,不重回队列 requeue=false; channel.basicNack(deliveryTag,true,false); } } }

    测试

    死信交换机和死信队列和普通的没有区别当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列 如果没有绑定死信交换机,那么消息过期后就丢了 消息成为死信的三种情况:

    队列消息长度到达限制;

    消费者拒接消费消息,并且不重回队列;

    原队列存在消息过期设置,消息到达超时时间未被消费;

    延迟队列

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

    注意:在RabbitMQ中并未提供延迟队列功能。

    但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

    提出需求:

    下单后,30分钟未支付,取消订单,回滚库存。

    实现

    编写配置

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加载配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" /> <!--定义管理交换机、队列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--========================================================================================================================================--> <!--定义消息的可靠性传递(生产端)--> <!--name属性表示队列queue的名称--> <rabbit:queue id="test_queue_confirm" name="test_confirm_queueName"></rabbit:queue> <!--定义一个交换机--> <rabbit:direct-exchange name="test_exchange_confirm_exchangeName"> <!--将队列和交换机绑定--> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm_routingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--TTL Queue--> <rabbit:queue id="test_queue_ttl" name="test_queueName_ttl"> <rabbit:queue-arguments> <!--设置怎个队列的消息存活时间,这里我设置了10秒--> <!--注意,它的值是number类型的,所以我们需要指定它的value类型--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--TTL Exchange--> <rabbit:topic-exchange name="test_exchangeName_ttl"> <!--绑定queue--> <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 死信队列 声明正常的队列(test_queueName_dlx)和交换机(test_exchangeName_dlx) 声明死信队列queueName_dlx)和死信交换机(exchangeName_dlx) 正常队列绑定死信交换机 设置两个参数: x-dead-letter-exchange 表示死信交换机名称 x-dead-letter-routing-key 发送给死信交换机的routingKey --> <!--声明正常的队列--> <rabbit:queue id="test_queue_dlx" name="test_queueName_dlx"> <!--设置正常队列绑定死信交换机--> <rabbit:queue-arguments> <!--x-dead-letter-exchange 表示死信交换机名称--> <entry key="x-dead-letter-exchange" value="exchangeName_dlx"></entry> <!--x-dead-letter-routing-key 发送给死信交换机的routingKey--> <entry key="x-dead-letter-routing-key" value="dlx.cdw"></entry> <!-- 怎么让一个消息变为死信,三种方式 设置队列过期时间 ttl 设置长度限制 max-length --> <!--设置队列过期时间--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> <!--设置长度限制,我这里设置这个队列最大能存储10条消息,当存储到第11条消息时,第11条消息就是死信消息了--> <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--定义正常交换机--> <rabbit:topic-exchange name="test_exchangeName_dlx"> <!--将正常的队列绑定到正常的交换机--> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--声明死信队列--> <rabbit:queue id="queue_dlx" name="queueName_dlx"></rabbit:queue> <!--声明死信交换机--> <rabbit:topic-exchange name="exchangeName_dlx"> <!--将死信队列绑定到死信交换机--> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 延迟队列(TTL+DLX组合实现) 定义正常交换机(order_exchangeName)和队列(order_queueName) 定义死信交换机(order_exchangeName_dlx)和队列(order_queueName_dlx) 绑定正常队列到死信交换机 设置两个参数: x-dead-letter-exchange 表示死信交换机名称 x-dead-letter-routing-key 发送给死信交换机的routingKey --> <!--定义正常queue--> <rabbit:queue id="order_queue" name="order_queueName"> <!--绑定死信交换机--> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="order_exchangeName_dlx"></entry> <entry key="x-dead-letter-routing-key" value="order.dlx.cancel"></entry> <!--设置队列过期时间,我们实现的需求是30分钟,但是我们为了测试,就写10秒测试了--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry> </rabbit:queue-arguments> </rabbit:queue> <!--定义正常的exchange--> <rabbit:topic-exchange name="order_exchangeName"> <!--绑定正常的队列--> <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--定义死信队列--> <rabbit:queue id="order_queue_dlx" name="order_queueName_dlx"></rabbit:queue> <!--定义死信交换机--> <rabbit:topic-exchange name="order_exchangeName_dlx"> <!--绑定死信队列--> <rabbit:bindings> <rabbit:binding pattern="order.dlx.#" queue="order_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> </beans>

    编写生产端

    /* 队列延迟 注意,我们需要先把队列和交换机创建出来(随便执行一个测试方法发送一条消息即可),要不然后面我们先启动消费者没有找到队列和交换机就报错了 */ @Test public void delayTest()throws Exception { //模拟发送订单信息,将来是在订单系统中,下单成功后,发送消息 String message="订单信息: id=1,time=2019年12月30日19:41:56"; rabbitTemplate.convertAndSend("order_exchangeName", "order.msg", message); //打印倒计时10秒 for (int i = 10; i > 0; i--) { System.out.println(i); Thread.sleep(1000); } }

    编写消费端监听器

    package cn.cdw.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /* 延迟队列的模拟订单过时测试 */ @Component public class OrderListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ System.out.println(new String(message.getBody())); System.out.println("处理业务逻辑"); System.out.println("根据订单id查询其状态"); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存"); //签收 channel.basicAck(deliveryTag, true); }catch (Exception e){ // e.printStackTrace(); System.out.println("出现异常,拒绝签收"); //拒签 channel.basicNack(deliveryTag, true, true); } } }

    配置注册消费端监听器

    <!-- 消费端订单监听器 延迟队列效果实现:一定要监听的是死信队列 注意:它应该监听的是这个死信的队列,因为这个死信队列有延迟的功能 如果监听正常的队列那么,消息一来就到了,没启动需求的作用 --> <rabbit:listener ref="orderListener" queue-names="order_queueName_dlx"></rabbit:listener>

    测试:启动消费端,再启动生产端,注意控制台打印,消费端需要等待10秒才会收到死信队列的消息,因为死信队列有延迟队列功能(TTL+DLX实现)

    小结

    延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

    RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。

    日志与监控

    RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

    查看虚拟机命令: rabbitmqctl list_vhosts

    查看连接命令: rabbitmqctl list_connections

    查看exchanges:rabbitmqctl list_exchanges

    查看消费者信息:rabbitmqctl list_consumers

    查看环境变量:rabbitmqctl environment

    查看未被确认的队列:rabbitmqctl list_queues name messages_unacknowledged

    查看单个队列的内存使用:rabbitmqctl list_queues name memory

    查看准备就绪的队列:rabbitmqctl list_queues name messages_ready

    消息追踪

    注意使用guest用户,虚拟机使用 / 默认的

    在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和

    在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。维人员进行问题的定位。

    消息追踪-Firehose

    将队列绑定到amq.rabbitmq.trace,路由key为#,所有的请求消息它都发送

    rabbitmqctl trace_on:开启Firehose命令

    ​ 注意:打开 trace 会影响消息写入功能,适当打开后请关闭

    建议:在开发阶段我们可以开启消息追踪,在实际生产环境建议将其关闭

    rabbitmqctl trace_off:关闭Firehose命令

    我们向队列发送一条消息

    我们发现当前消息也正常存在,并且开启消息追踪后,会多出一条消息是 amq.rabbitmq.trace 交换机发给当前队列的消息,消息中的内容是比较完整的

    消息追踪-rabbitmq_tracing

    rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

    启用插件:rabbitmq-plugins enable rabbitmq_tracing

    查看插件命令:

    先输入命令:rabbitmq-plugins

    列表查看: rabbitmq-plugins list

    ​ e* 表示启用的

    建议:在开发阶段我们可以开启消息追踪插件,在实际生产环境不建议建议开启,除非是非常特殊的业务场景

    关闭插件:rabbitmq-plugins disable rabbitmq_tracing

    关闭Firehose:rabbitmqctl trace_off

    Processed: 0.013, SQL: 9