基本API
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.close();
connection.close();
创建交换机
channel.exchangeDeclare("shanguoyu", BuiltinExchangeType.DIRECT,false,false,false,null);
创建备用交换机
channel.exchangeDeclare("shanguoyu2", BuiltinExchangeType.DIRECT,false,false,false,null);
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("alternate-exchange","shanguoyu2");
channel.exchangeDeclare("shanguoyu3", BuiltinExchangeType.DIRECT,false,false,false,argsMap);
创建队列
channel.queueDeclare("shanguoyu",false,false,false,null);
死信交换机
channel.exchangeDeclare("shanguoyu4", BuiltinExchangeType.DIRECT,false,false,false,null);
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-dead-letter-exchange","shanguoyu4");
channel.queueDeclare("shanguoyu2",false,false,false,argsMap);
延时队列
public class QueueTest3 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.108");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("DEAD_LETTER_EXCHANGE", BuiltinExchangeType.DIRECT,false,false,false,null);
channel.queueDeclare("DEAD_LETTER_QUEUE", false, false, false, null);
channel.queueBind("DEAD_LETTER_QUEUE", "DEAD_LETTER_EXCHANGE", "DL");
channel.exchangeDeclare("shanguoyu", BuiltinExchangeType.DIRECT,false,false,false,null);
Map<String,Object> map=new HashMap<>();
map.put("x-message-ttl", TimeUnit.SECONDS.toMillis(20));
map.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE");
map.put("x-dead-letter-routing-key","DL");
channel.queueDeclare("shanguoyu", false, false, false, map);
channel.queueBind("shanguoyu", "shanguoyu", "shanguoyu");
}
}
class Product{
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.108");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicPublish("shanguoyu", "shanguoyu", null, "Hello".getBytes());
channel.close();
connection.close();
}
}
public class QueueTest4 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.108");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("dea.letter.exchange", "x-delayed-message",false,false,false,argss);
channel.queueDeclare("dead.letter.queue", false, false, false, null);
channel.queueBind("dead.letter.queue", "dea.letter.exchange", "DL");
channel.close();
connection.close();
}
}
class Product2{
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.108");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Map<String, Object> stringObjectHashMap = new HashMap<>();
stringObjectHashMap.put("x-delay", 20000);
AMQP.BasicProperties build = properties.builder().headers(stringObjectHashMap).build();
channel.basicPublish("dea.letter.exchange","DL",build , "Hello".getBytes());
Thread.sleep(20000);
System.out.println("到达了!");
channel.close();
connection.close();
}
}
过期时间的队列(其他参数不在举例)
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-expires", 5000);
channel.queueDeclare("shanguoyu3",false,false,false,argsMap);
创建绑定关系
channel.queueBind("shanguoyu","shanguoyu","shanguoyu",null);
生产者发送消息
channel.basicPublish("shanguoyu","shanguoyu",false,false,null,"Hello".getBytes(StandardCharsets.UTF_8));
生产者失败通知
channel.addReturnListener((replycode, replyText, exchange, routingKey, basicProperties, bytes) -> {
System.out.println("交换机为:" + exchange);
System.out.println("路由键为:" + routingKey);
System.out.println("消息体为:" + new String(bytes));
System.out.println("消息头帧为:" + basicProperties);
System.out.println("错误文本为:" + replyText);
System.out.println("错误码为:" + replycode);
});
channel.basicPublish("shanguoyu", "shanguoyu1", true, false, null, "Hello".getBytes(StandardCharsets.UTF_8));
发送方确认——一般确认
channel.confirmSelect();
channel.basicPublish("shanguoyu","asdsadsa",false,false,null,"Hello".getBytes());
boolean b = channel.waitForConfirms();
if (b) {
System.out.println("发送成功");
}else{
System.out.println("发送失败");
}
发送方确认——批量确认
channel.confirmSelect();
channel.waitForConfirmsOrDie();
for (int i = 0; i < 10; i++) {
channel.basicPublish("shanguoyu","asdsadsa",false,false,null,"Hello".getBytes());
}
发送方确认——异步确认
channel.confirmSelect();
channel.addConfirmListener(
((deliveryTag,multiple)->System.out.println("发送成功标签是:"+deliveryTag+",multiple:"+multiple)),
((deliveryTag,multiple)->System.out.println("发送失败标签是:"+deliveryTag+",multiple:"+multiple))
);
for (int i = 0; i < 10; i++) {
channel.basicPublish("shanguoyu","asdsadsa",false,false,null,"Hello".getBytes());
}
事务
Channel channel = connection.createChannel();
channel.txSelect();
channel.txCommit();
channel.txRollback();
消费者消费消息(订阅方式)
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
}
};
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
消息的确认——手动ACK
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
消息的确认——自动ACK
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
}
};
channel.basicConsume("shanguoyu",true,"",false,false,null,consumer);
消息的单条拒绝——Reject
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
channel.basicReject(envelope.getDeliveryTag(),true);
}
};
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
消息的批量拒绝——Nack
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
};
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
消费端限流(QOS)
channel.basicQos(0, 1, false);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
转载请注明原文地址:https://blackberry.8miu.com/read-4016.html