执行的方案:
第一步,要对队列,消息以及交换机进行持久化操作(保存到物理磁盘中)
因为mq消息默认是保存在内存中
交换机我们在声明的时候可以进行持久化
@Bean(EX_BUYING_ADDPOINTUSER) public Exchange EX_BUYING_ADDPOINTUSER(){ return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build(); } 解析: durable(true)表示对当前交换进行持久化
队列持久化
@Bean public Queue queue(){ return new Queue(ORDER_TACK); } 解析: 当前new的过程中如果只有一个参数则表示默认的就是已经持久化了 源码: public Queue(String name) { this(name, true, false, false); }
消息持久化,不需要设置的,我们的消息是保存在队列中,队列如果说是持久化的,那么我们的消息就是持久化的。
confirm机制 confirm模式需要基于channel进行设置, 一旦某条消息被投递到队列之后,消息队列就会发送一个确认信息给生产者,如果队列与消息是可持久化的, 那么确认消息会等到消息成功写入到磁盘之后发出. confirm的性能高,主要得益于它是异步的.生产者在将第一条消息发出之后等待确认消息的同时也可以继续发送后续的消息.当确认消息到达之后,就可以通过回调方法处理这条确认消息. 如果MQ服务宕机了,则会返回nack消息. 生产者同样在回调方法中进行后续处理。
//增强rabbitmq,代替原来发送消息的一个类,可以防止丢失数据 @Component public class ConfirmMessageSender implements RabbitTemplate.ConfirmCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private RedisTemplate redisTemplate;
public static final String MESSAGE_CONFIRM_KEY="message_confirm_";
//有参构造 public ConfirmMessageSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; //rabbitmq设置提交回调 rabbitTemplate.setConfirmCallback(this); } //接收消息服务器返回的通知的 ,成功的通知和失败通知,第二步 /** * * @param correlationData 用来保证消息的唯一 * @param ack 应答 true表示成功的通知,false失败的通知 * @param cause 原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ //成功通知,表示我们数据已经发送成功并且持久化到了磁盘中 //删除redis中的相关数据 redisTemplate.delete(correlationData.getId()); redisTemplate.delete(MESSAGE_CONFIRM_KEY+correlationData.getId()); }else{ //失败通知 ,没有持久化到磁盘中 //从redis中获取刚才的消息内容 Map<String,String> map = (Map<String,String>)redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY+correlationData.getId()); //重新发送 String exchange = map.get("exchange"); String routingkey = map.get("routingKey"); String message = map.get("message"); //再次发送,直到持久化到磁盘为止,每次发送都带着唯一标识 rabbitTemplate.convertAndSend(exchange,routingkey, JSON.toJSONString(message),correlationData); } }
//自定义消息发送方法,第一步,当执行该方法的之后,去调用confirm public void sendMessage(String exchange,String routingKey,String message){ /** * 重点是 CorrelationData 对象,每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。 */ //设置消息的唯一标识并存入到redis中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //把消息缓存到redis中 ,目的是备份 redisTemplate.opsForValue().set(correlationData.getId(),message);
//将本次发送消息的相关元数据保存到redis中 Map<String,String> map = new HashMap<>(); map.put("exchange",exchange); map.put("routingKey",routingKey); map.put("message",message); //把元数据缓存到redis中,保证消息的唯一 redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY+correlationData.getId(),map);
//携带着本次消息的唯一标识,进行数据发送 rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
} }
在application.yml中设置手动应答:
rabbitmq: host: 192.168.200.128 listener: simple: acknowledge-mode: manual #手动
在监听类中设置手动应答:
@Component public class ConsumeListener { @RabbitListener(queues = RabbitMQConfig.SECKILL_ORDER_KEY) public void receiveSecKillOrderMessage(Channel channel, Message message){ try {
.....逻辑处理........
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); //返回失败通知 //第一个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝 //第二个boolean false当前消息会进入到死信队列,true重新回到原有队列中,默认回到头部 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } catch (IOException e) { e.printStackTrace(); }
}
}
在监听类中设置:
channel.basicQos(300);
这个300是官方给出的值,代表每次在mq中抓取300个消息消费,
太大会影响服务器的性能,太小就回浪费