RabbitMQ入门学习笔记

    科技2024-07-15  66

    消息中间件之RabbitMQ

    消息中间件概述什么是MQ?为什么要使用MQ?MQ的应用场景消息队列的方式AMQPJMSAMQP 与 JMS 区别常见的消息队列 RabbitMQRabbitMQ之简单模式 RabbitMQ常见问题消息堆积消息堆积的影响产生堆积的原因解决方案场景介绍模拟消息堆积 消息丢失消息在生产者丢失消息在RabbitMQ丢失消息在消费者丢失 有序消费消息重复消费

    消息中间件概述

    什么是MQ?

    MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

    为什么要使用MQ?

    在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

    MQ的应用场景

    异步处理

    将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

    举例–当我们在进行签到的时候,点击签到之后,系统返回签到成功的提示给用户,但此时我们的积分并不是马上进增加了,我们的签到请求会被放在一个消息队列中,再由业务逻辑层从消息队列中获取这个请求之后才去进行逻辑处理,这样就大大的缩短了给用户的响应时间。

    应用程序解耦合

    MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

    削峰填谷

    如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

    消息队列的方式

    MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

    AMQP

    AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

    JMS

    JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

    AMQP 与 JMS 区别

    JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。JMS规定了两种消息模式;而AMQP的消息模式更加丰富

    常见的消息队列

    ActiveMQ:基于JMS ZeroMQ:基于C语言开发 RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好 RocketMQ:基于JMS,阿里巴巴产品 Kafka:类似MQ的产品;分布式消息系统,高吞吐量

    RabbitMQ

    RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

    RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式

    RabbitMQ之简单模式

    生产者

    public class Producer { static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址;默认为 localhost connectionFactory.setHost("localhost"); //连接端口;默认为 5672 connectionFactory.setPort(5672); //虚拟主机名称;默认为 / connectionFactory.setVirtualHost("/gzgs"); //连接用户名;默认为guest connectionFactory.setUsername("guest"); //连接密码;默认为guest connectionFactory.setPassword("guest"); ​ //创建连接 Connection connection = connectionFactory.newConnection(); ​ // 创建频道 Channel channel = connection.createChannel(); ​ // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); ​ // 要发送的信息 String message = "你好;小兔子!"; /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已发送消息:" + message); ​ // 关闭资源 channel.close(); connection.close(); } }

    消费者

    public class Consumer { ​ public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); ​ // 创建频道 Channel channel = connection.createChannel(); ​ // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); ​ //创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("接收到的消息为:" + new String(body, "utf-8")); } }; //监听消息 /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.QUEUE_NAME, true, consumer); ​ //不关闭资源,应该一直监听消息 //channel.close(); //connection.close(); } }

    抽取创建连接工具类

    public class ConnectionUtil { public static Connection getConnection() throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址;默认为 localhost connectionFactory.setHost("localhost"); //连接端口;默认为 5672 connectionFactory.setPort(5672); //虚拟主机名称;默认为 / connectionFactory.setVirtualHost("/gzgs"); //连接用户名;默认为guest connectionFactory.setUsername("guest"); //连接密码;默认为guest connectionFactory.setPassword("guest");​ //创建连接 return connectionFactory.newConnection(); }​ }

    后面其他模式的代码不再赘述,有需要可以下载上传的代码

    RabbitMQ常见问题

    消息堆积

    当消息生产的速度长时间,远远大于消费的速度时。就会造成消息堆积。

    消息堆积的影响

    可能导致新消息无法进入队列

    可能导致旧消息无法丢失

    消息等待消费的时间过长,超出了业务容忍范围。

    产生堆积的原因

    生产者突然大量发布消息

    消费者消费失败

    消费者出现性能瓶颈。

    消费者挂掉

    解决方案

    排查消费者的消费性能瓶颈

    增加消费者的多线程处理

    部署增加多个消费者

    场景介绍

    用户登录成功之后,会向rabbitmq发送一个登录成功的消息。这个消息可以被多类业务订阅。

    登录成功,记录登录日志;登录成功,根据规则送积分。其中登录送积分可以模拟成较为耗时的处理

    模拟消息堆积

    1、生产者大量发送消息:使用Jmeter开启多线程,循环发送消息大量进入队列,模拟堆积10万条数据。

    2、消费者消费失败:随机抛出异常,模拟消费者消费失败,没有ack(手动ack的时候)。

    3、设置消费者的性能瓶颈:在消费方法中设置休眠时间,模拟性能瓶颈

    4、关闭消费者:停掉消费者,模拟消费者挂掉

    5、消费者端示例核心代码:

    public class LoginIntegralComsumer implements MessageListener { public void onMessage(Message message) { String jsonString = null; try { jsonString = new String(message.getBody(),"UTF8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } if(new Random().nextInt(5)==2){ //模拟发生异常 throw new RuntimeException("模拟处理异常"); } try { //模拟耗时的处理过程 TimeUnit.MILLISECONDS.sleep(1000); System.out.println(Thread.currentThread().getName()+"处理消息:"+jsonString); } catch (InterruptedException e) { e.printStackTrace(); } } }

    6、如果每1秒钟处理一条消息,1小时处理 60*60=3600条,处理完10万条数据 100000/3600=27.7小时 。

    消息丢失

    在实际的生产环境中有可能出现一条消息因为一些原因丢失,导致消息没有消费成功,从而造成数据不一致等问题,造成严重的影响,比如:在一个商城的下单业务中,需要生成订单信息和扣减库存两个动作,如果使用RabbitMQ来实现该业务,那么在订单服务下单成功后需要发送一条消息到库存服务进行扣减库存,如果在此过程中,一条消息因为某些原因丢失,那么就会出现下单成功但是库存没有扣减,从而导致超卖的情况,也就是库存已经没有了,但是用户还能下单,这个问题对于商城系统来说是致命的。

    消息丢失的场景主要分为:消息在生产者丢失,消息在RabbitMQ丢失,消息在消费者丢失。

    消息在生产者丢失

    场景介绍

    消息生产者发送消息成功,但是MQ没有收到该消息,消息在从生产者传输到MQ的过程中丢失,一般是由于网络不稳定的原因。

    解决方案

    采用RabbitMQ 发送方消息确认机制,当消息成功被MQ接收到时,会给生产者发送一个确认消息,表示接收成功。RabbitMQ 发送方消息确认模式有以下三种:普通确认模式,批量确认模式,异步监听确认模式。spring整合RabbitMQ后只使用了异步监听确认模式。

    说明

    异步监听模式,可以实现边发送消息边进行确认,不影响主线程任务执行。

    步骤

    生产者发送3000条消息

    在发送消息前开启开启发送方确认模式

    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" />

    在发送消息前添加异步确认监听器

    //添加异步确认监听器 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { // 处理ack System.out.println("已确认消息,标识:" + correlationData.getId()); } else { // 处理nack, 此时cause包含nack的原因。 System.out.println("未确认消息,标识:" + correlationData.getId()); System.out.println("未确认原因:" + cause); //重发 } } });

    消息在RabbitMQ丢失

    场景介绍

    消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机或者重启会出现这种情况

    解决方案

    持久化交换机,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。

    spring整合后默认开启了交换机,队列,消息的持久化,所以不修改任何设置就可以保证消息不在RabbitMQ丢失。但是为了以防万一,还是可以申明下。

    消息在消费者丢失

    场景介绍

    消息费者消费消息时,如果设置为自动回复MQ,消息者端收到消息后会自动回复MQ服务器,MQ则会删除该条消息,如果消息已经在MQ被删除但是消费者的业务处理出现异常或者消费者服务宕机,那么就会导致该消息没有处理成功从而导致该条消息丢失。

    解决方案

    设置为手动回复MQ服务器,当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,那么该消息会一直保存在MQ服务器,直到消息者能正常消费为止。本解决方案以一个队列绑定多个消费者为例来说明,一般在生产环境上也会让一个队列绑定多个消费者也就是工作队列模式来减轻压力,提高消息处理效率

    MQ重发消息场景: 1.消费者未响应ACK,主动关闭频道或者连接 2.消费者未响应ACK,消费者服务挂掉

    有序消费消息

    场景介绍

    场景1

    当RabbitMQ采用work Queue模式,此时只会有一个Queue但是会有多个Consumer,同时多个Consumer直接是竞争关系,此时就会出现MQ消息乱序的问题。

    场景2

    当RabbitMQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。

    场景1解决

    场景2解决

    重复消费

    场景介绍

    为了防止消息在消费者端丢失,会采用手动回复MQ的方式来解决,同时也引出了一个问题,消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,那么该条消息还会保存在MQ的消息队列,由于MQ的消息重发机制,会重新把该条消息发给和该队列绑定的消息者处理,这样就会导致消息重复消费。而有些操作是不允许重复消费的,比如下单,减库存,扣款等操作。

    MQ重发消息场景: 1.消费者未响应ACK,主动关闭频道或者连接 2.消费者未响应ACK,消费者服务挂掉

    10.4.2. 解决方案

    如果消费消息的业务是幂等性操作(同一个操作执行多次,结果不变)就算重复消费也没问题,可以不做处理,如果不支持幂等性操作,如:下单,减库存,扣款等,那么可以在消费者端每次消费成功后将该条消息id保存到数据库,每次消费前查询该消息id,如果该条消息id已经存在那么表示已经消费过就不再消费否则就消费。本方案采用redis存储消息id,因为redis是单线程的,并且性能也非常好,提供了很多原子性的命令,本方案使用setnx命令存储消息id。

    setnx(key,value):如果key不存在则插入成功且返回1,如果key存在,则不进行任何操作,返回0

    本文参考黑马教程,学习资料下载链接:https://download.csdn.net/download/weixin_45680962/12913763

    Processed: 0.012, SQL: 8