消息队列RabbitMQ

    科技2024-08-07  27

    RabbitMQ基础知识

    消息队列的特性: 1.业务无关 2.FIFO 3.容灾 4.性能

    RabbitMQ的特点和核心概念

    高级消息队列协议:

    RabbitMQ的安装和启动

    1 Linux下安装RabbitMQ

    官方安装指南:https://www.rabbitmq.com/install-rpm.html 安装erlang

    配置源:

    vim /etc/yum.repos.d/rabbitmq_erlang.repo

    将如下配置复制到rabbitmq_erlang.repo中

    [rabbitmq-erlang] name=rabbitmq-erlang baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/22/el/7 gpgcheck=1 gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc repo_gpgcheck=0 enabled=1

    yum clean all yum makecache

    然后yum install erlang 要确认源是“rabbitmq_erlang”

    如果你网络不好,也可以使用下载好的直接安装 yum install安装包上传到linux的完整路径名.

    当输入erl -version,弹出版本说明已经安装好了, 输erl能进入erl的界面

    安装RabbitMQ

    我们将要安装的RabbitMQ的版本是3.8.2

    导入密钥

    rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

    下载rpm安装包:

    wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm

    如果速度比较慢,就用:

    wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.2/rabbitmq-server-3.8.2-1.el7.noarch.rpm

    也可以自己下载安装包,然后进行上传

    下载完成后,安装:

    yum install rabbitmq-server-3.8.2-1.el7.noarch.rpm

    如果出现解压错误,说明下载了多次,用ls -la看一下有几个文件,如果有多个安装包,要把多余的删掉,把正确的改名为rabbitmq-server-3.8.2-1.el7.noarch.rpm,再执行yum install来安装 到这里RabbitMQ就安装好了

    RabbitMQ常用命令 r)

    停止RabbitMQ

    $rabbitmqctl stop

    设置开机启动

    $ systemctl enable rabbitmq-server

    启动RabbitMQ

    $ systemctl start rabbitmq-server

    看看端口有没有起来,查看状态

    $ rabbitmqctl status

    要检查RabbitMQ服务器的状态,请运行:

    systemctl status rabbitmq-server

    开启web管理界面

    rabbitmq-plugins enable rabbitmq_management

    RabbitMQ的应用

    RabbitMQ的管理后台

    开启web管理界面

    rabbitmq-plugins enable rabbitmq_management

    添加用户账号:

    添加用户名为admin,密码为123456的用户

    rabbitmqctl add_user admin 123456

    给用户设置权限(把admin添加到administrator中【用户管理员】)

    rabbitmqctl set_user_tags admin administrator

    默认管理后台的端口为15672

    之后便可以在浏览器中输入IP地址和端口号进入后台管理页面:

    新建RabbitMQ项目:

    先新建maven项目,然后引入两个依赖

    <!-- rabbitMQ依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!-- 记录日志的依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.29</version> </dependency>

    外界访问RabbitMQ用的是5672端口,因此需要先开启此端口 在后台管理的Permissions添加用户

    发送消息的步骤: 第一个生产者:

    package helloWorld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Hello World 的发送消息类,连接到RabbitMQ服务端,然后发送消息,然后退出 */ public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); //发布消息 String message = "Hello World"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("发送了消息" + message); //关闭连接 channel.close(); connection.close(); } }

    第一个生产者:

    package helloWorld; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(java.lang.String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); //接收消息并消费 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息" + message); } }); } }

    多个消费者平均分配压力: 根据工作量的平均:

    package workqueues; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 批量接收消息 */ public class MoreNewRecv { private final static String TASK_QUEUE_NAME = "task"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); //获取信道 final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //改为不自动确认消息 channel.basicQos(1); channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到了消息" + message); try { doWord(message); }finally { System.out.println("消息处理完成" + "" + ""); //处理完成后再确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }); } private static void doWord(String task) { char[] chars = task.toCharArray(); for (char ch : chars) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }

    交换机类型

    第四种基本不会用到

    fanout模式

    生产者:

    package fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发送日志信息 */ public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "info:Hello World"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("发送了消息" + message); channel.close(); connection.close(); } }

    消费者:

    package fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收日志消息 */ public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //创建一个非持久、自动删除的队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; channel.basicConsume(queueName, true, consumer); } }

    direct模式

    实例: 一个生产者:

    package direct; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发送日志信息 */ public class EmitLogDirect { private static final String EXCHANGE_NAME = "DirectLogs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String message = "info:Hello World"; channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8")); System.out.println("发送了消息" + "等级为info,内容为:"+message); message = "warning:Hello World"; channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8")); System.out.println("发送了消息" + "等级为warning,内容为:"+message); message = "err:Hello World"; channel.basicPublish(EXCHANGE_NAME, "err", null, message.getBytes("UTF-8")); System.out.println("发送了消息" + "等级为err,内容为:"+message); channel.close(); connection.close(); } }

    接收3种类型的消费者:

    package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收3个类型的日志 */ public class ReceiveLogsDirect1 { private static final String EXCHANGE_NAME = "DirectLogs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建一个非持久、自动删除的队列 //生成随机的、临时的queue String queueName = channel.queueDeclare().getQueue(); //一个交换机绑定3个queue channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); channel.queueBind(queueName, EXCHANGE_NAME, "err"); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; channel.basicConsume(queueName, true, consumer); } }

    只接收一种类型的接受者:

    package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接收3个类型的日志 */ public class ReceiveLogsDirect2 { private static final String EXCHANGE_NAME = "DirectLogs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建一个非持久、自动删除的队列 //生成随机的、临时的queue String queueName = channel.queueDeclare().getQueue(); //一个交换机绑定1个queue channel.queueBind(queueName, EXCHANGE_NAME, "err"); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; channel.basicConsume(queueName, true, consumer); } }

    topic模式

    生产者:

    package topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * topic交换机,发送信息 */ public class EmitLogTopic { private static final String EXCHANGE_NAME = "TopicLogs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String message = "Animal World"; String[] routingKeys = new String[9]; routingKeys[0] = "quick.orange.rabbit"; routingKeys[1] = "lazy.orange.elephant"; routingKeys[2] = "quick.orange.fox"; routingKeys[3] = "lazy.brown.fox"; routingKeys[4] = "lazy.pink.rabbit"; routingKeys[5] = "quick.brown.fox"; routingKeys[6] = "orange"; routingKeys[7] = "quick.orange.male.rabbit"; routingKeys[8] = "lazy.orange.male.rabbit"; for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, message.getBytes("UTF-8")); System.out.println("发送了"+message+" "+"routingKey"+routingKeys[i]); } channel.close(); connection.close(); } }

    消费者1:

    package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 特定路由键 */ public class ReceiveLogsTopic1 { private static final String EXCHANGE_NAME = "TopicLogs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //创建一个非持久、自动删除的队列 //生成随机的、临时的queue String queueName = channel.queueDeclare().getQueue(); String routingKey = "*.orange.*"; channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息" + message + " " + "routingKey:" + envelope.getRoutingKey()); } }; channel.basicConsume(queueName, true, consumer); } }

    消费者2

    package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 特定路由键 */ public class ReceiveLogsTopic2 { private static final String EXCHANGE_NAME = "TopicLogs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.91.130"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //创建一个非持久、自动删除的队列 //生成随机的、临时的queue String queueName = channel.queueDeclare().getQueue(); String routingKey = "*.*.rabbit"; channel.queueBind(queueName, EXCHANGE_NAME, routingKey); String routingKey2 = "lazy.#"; channel.queueBind(queueName, EXCHANGE_NAME, routingKey2); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息" + message + " " + "routingKey:" + envelope.getRoutingKey()); } }; channel.basicConsume(queueName, true, consumer); } }
    Processed: 0.012, SQL: 8