前置准备
创建SpringBoot项目(版本2.2)准备RabbitmQ服务器(版本3.7.17)引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
整合生产者
配置文件
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.host=192.168.1.108
spring.rabbitmq.port=5672
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.template.retry.enabled=false
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0
配置类1
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("shanguoyu", false, false, null);
}
@Bean
public Queue directQueue() {
return new Queue("shanguoyu", false, false, false, null);
}
@Bean
public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
Binding binding = BindingBuilder.bind(directQueue).to(directExchange).with("shanguoyu");
return binding;
}
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback(RabbitTemplate rabbitTemplate) {
RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
if (ack) {
System.out.println("已发送到交换器!");
} else {
System.out.println("出现了异常!(考虑重发)");
System.out.println("原因是:" + cause);
}
};
rabbitTemplate.setConfirmCallback(confirmCallback);
return confirmCallback;
}
@Bean
public RabbitTemplate.ReturnCallback returnCallback(RabbitTemplate rabbitTemplate) {
RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("失败通知:消息不可路由!");
System.out.println("消息是:" + new String(message.getBody()));
System.out.println("交换器是::" + exchange);
System.out.println("路由键是:" + routingKey);
System.out.println("状态码:" + replyCode);
System.out.println("错误原因:" + replyText);
};
rabbitTemplate.setReturnCallback(returnCallback);
return returnCallback;
}
}
配置类2
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.template.mandatory}")
private boolean mandatory;
@Bean
public DirectExchange directExchange() {
return new DirectExchange("shanguoyu", false, false, null);
}
@Bean
public Queue directQueue() {
return new Queue("shanguoyu", false, false, false, null);
}
@Bean
public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
Binding binding = BindingBuilder.bind(directQueue).to(directExchange).with("shanguoyu");
return binding;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
RabbitTemplate.ConfirmCallback confirmCallback,
RabbitTemplate.ReturnCallback returnCallback){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
return rabbitTemplate;
}
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
if (ack) {
System.out.println("已发送到交换器!");
} else {
System.out.println("出现了异常!(考虑重发)");
System.out.println("原因是:" + cause);
}
};
return confirmCallback;
}
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("失败通知:消息不可路由!");
System.out.println("消息是:" + new String(message.getBody()));
System.out.println("交换器是::" + exchange);
System.out.println("路由键是:" + routingKey);
System.out.println("状态码:" + replyCode);
System.out.println("错误原因:" + replyText);
};
return returnCallback;
}
}
整合消费者
配置文件
spring.rabbitmq.host=192.168.1.108
spring.rabbitmq.port=5673
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=0
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.default-requeue-rejected=false
监听类
@Component
public class RabbitReceiver {
@RabbitListener(queues = {"shanguoyu"})
public void onMessage(Message message, Channel channel) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
System.out.println(new String(message.getBody()));
channel.basicAck(deliveryTag,false);
}
}
转载请注明原文地址:https://blackberry.8miu.com/read-16294.html