基于Spring Boot工程浅谈整合RABBITMQ怎么样防止消息发送MQ不丢失和消费MQ的消息防止丢失

    科技2024-12-22  7

    本文只针对springboot整合rabbitmq的消息防丢失,话不多说,上干货....

     

    设置发送mq消息不丢失实现思路

     

    执行的方案:

    第一步,要对队列,消息以及交换机进行持久化操作(保存到物理磁盘中)

    因为mq消息默认是保存在内存中

    交换机我们在声明的时候可以进行持久化

     

    @Bean(EX_BUYING_ADDPOINTUSER) public Exchange EX_BUYING_ADDPOINTUSER(){ return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build(); } 解析: durable(true)表示对当前交换进行持久化

     

    队列持久化

    @Bean public Queue queue(){ return new Queue(ORDER_TACK); } 解析: 当前new的过程中如果只有一个参数则表示默认的就是已经持久化了 源码: public Queue(String name) { this(name, true, false, false); }

     

    注意:

    消息持久化,不需要设置的,我们的消息是保存在队列中,队列如果说是持久化的,那么我们的消息就是持久化的。

     

    confirm机制 confirm模式需要基于channel进行设置, 一旦某条消息被投递到队列之后,消息队列就会发送一个确认信息给生产者,如果队列与消息是可持久化的, 那么确认消息会等到消息成功写入到磁盘之后发出. confirm的性能高,主要得益于它是异步的.生产者在将第一条消息发出之后等待确认消息的同时也可以继续发送后续的消息.当确认消息到达之后,就可以通过回调方法处理这条确认消息. 如果MQ服务宕机了,则会返回nack消息. 生产者同样在回调方法中进行后续处理。

     

    思路是把要发送的消息先放一份到redis中 ,当消息发到了交换机exchange中完成就回返回ack为true,那么就完成发送删除redis的消息,否则就从redis取出消息再次发送.直到发送成功....

    //增强rabbitmq,代替原来发送消息的一个类,可以防止丢失数据 @Component public class ConfirmMessageSender implements RabbitTemplate.ConfirmCallback {

    @Autowired private RabbitTemplate rabbitTemplate;

    @Autowired private RedisTemplate redisTemplate;

    public static final String MESSAGE_CONFIRM_KEY="message_confirm_";

    //有参构造 public ConfirmMessageSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; //rabbitmq设置提交回调 rabbitTemplate.setConfirmCallback(this); } //接收消息服务器返回的通知的 ,成功的通知和失败通知,第二步 /** * * @param correlationData 用来保证消息的唯一 * @param ack 应答 true表示成功的通知,false失败的通知 * @param cause 原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ //成功通知,表示我们数据已经发送成功并且持久化到了磁盘中 //删除redis中的相关数据 redisTemplate.delete(correlationData.getId()); redisTemplate.delete(MESSAGE_CONFIRM_KEY+correlationData.getId()); }else{ //失败通知 ,没有持久化到磁盘中 //从redis中获取刚才的消息内容 Map<String,String> map = (Map<String,String>)redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY+correlationData.getId()); //重新发送 String exchange = map.get("exchange"); String routingkey = map.get("routingKey"); String message = map.get("message"); //再次发送,直到持久化到磁盘为止,每次发送都带着唯一标识 rabbitTemplate.convertAndSend(exchange,routingkey, JSON.toJSONString(message),correlationData); } }

    //自定义消息发送方法,第一步,当执行该方法的之后,去调用confirm public void sendMessage(String exchange,String routingKey,String message){ /** * 重点是 CorrelationData 对象,每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。 */ //设置消息的唯一标识并存入到redis中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //把消息缓存到redis中 ,目的是备份 redisTemplate.opsForValue().set(correlationData.getId(),message);

    //将本次发送消息的相关元数据保存到redis中 Map<String,String> map = new HashMap<>(); map.put("exchange",exchange); map.put("routingKey",routingKey); map.put("message",message); //把元数据缓存到redis中,保证消息的唯一 redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY+correlationData.getId(),map);

    //携带着本次消息的唯一标识,进行数据发送 rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);

    } }

     

    获取获取mq中的消息不丢失?

     

    在application.yml中设置手动应答:

    rabbitmq: host: 192.168.200.128 listener: simple: acknowledge-mode: manual #手动

     

    在监听类中设置手动应答:

    简单来说就是当业务逻辑处理没问题就执行channel.basicAck的方法,来返回消费完成,如果出现问题了 就执行channel.basicNack的方法,消息会回到原有的队列,重新的发送,一直到消息发送业务逻辑执行成功

    @Component public class ConsumeListener { @RabbitListener(queues = RabbitMQConfig.SECKILL_ORDER_KEY) public void receiveSecKillOrderMessage(Channel channel, Message message){ try {

    .....逻辑处理........

     

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); //返回失败通知 //第一个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝 //第二个boolean false当前消息会进入到死信队列,true重新回到原有队列中,默认回到头部 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } catch (IOException e) { e.printStackTrace(); }

    }

    }

     

    流量削峰:保证在同一时间内流量进行削弱,然后放行过来

    在监听类中设置:

    channel.basicQos(300);

    这个300是官方给出的值,代表每次在mq中抓取300个消息消费,

    太大会影响服务器的性能,太小就回浪费

     

    希望对大家有帮助...

    Processed: 0.182, SQL: 8