RocketMQ入门

    科技2025-04-27  20

    文章目录

    什么是MQ为何用消息队列linux下安装RocketMQRocketMQ监控平台springboot整合同步发送对象异步消息单向消息顺序消息延时消息批量消息事务消息消息过滤 RocketMQ集群实战场景(逾期订单的状态修改)实战场景(rpc下的异步处理)

    什么是MQ

    消息队列(Message Queue),是一种应用程序对应用程序的通信方法,是分布式系统的重要组件。

    为何用消息队列

    可以解决一些应用场景的高并发问题。 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用MQ来处理。 消息队列在实际应用中包括如下四个场景

    解除耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;削峰填谷:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理。

    linux下安装RocketMQ

    安装jdk安装maven 解压缩 tar xvf apache-maven-3.6.0-bin.tar.gz 移动文件夹到/usr/local目录下 mv apache-maven-3.6.0 /usr/local/ 配置maven环境变量 vim /etc/profile 文件末尾追加: export MAVEN_HOME=/usr/local/apache-maven-3.6.0 export MAVEN_HOME export PATH=${PATH}:${MAVEN_HOME}/bin 刷新配置文件 source /etc/profile 修改配置文件setting.xml <mirror> <id>aliyun</id> <mirrorOf>central</mirrorOf> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </mirror> 安装Git yum -y install git (直接上传压缩包可以不用安装git) 下载RockeMQ源码 git clone -b release-4.1.0-incubating https://github.com/apache/incubator-rocketmq.git cd incubator-rocketmq 此步骤可以文件上传 解压源码 如果没有安装unzip 执行:yum -y install unzip unzip rocketmq-all-4.3.0-source-release.zip 移动到/user/local目录下 编译 在rocket目录下执行 mvn -Prelease-all -DskipTests clean install -U 启动

    到编译之后的路径下 cd distribution/target/apache-rocketmq 先修改rocket内存大小,默认虚拟机内存不足以启动 修改bin目录下文件 runbroker.sh 修改内存大小 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m" 修改bin目录下文件 runserver.sh 修改内存大小 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m" 关闭防火墙 systemctl stop firewalld 后台启动Name Server nohup sh bin/mqnamesrv & 查看日志 tail -f ~/logs/rocketmqlogs/namesrv.log 后台启动Broker nohup sh bin/mqbroker -n 192.168.13.145:9876 autoCreateTopicEnable=true & 查看日志 tail -f ~/logs/rocketmqlogs/broker.log 测试 > export NAMESRV_ADDR=localhost:9876 ##发送消息 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer ##接收消息 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 停止 ##后启动的先停止 > sh bin/mqshutdown broker > sh bin/mqshutdown namesrv

    RocketMQ监控平台

    解压 unzip rocketmq-externals-master.zip #移动文件夹到/user/local下 >mv rocketmq-externals-master /usr/local/ #进入文件夹 >/usr/local/rocketmq-externals-master/rocketmq-console/src/main/resources #修改配置文件 >vim application.properties

    #进入文件夹 > cd /usr/local/rocketmq-externals-master/rocketmq-console/ 执行编译指令 > mvn clean package -Dmaven.test.skip=true 编译成功之后会生成一个jar文件

    启动指令 > java -jar target/rocketmq-console-ng-1.0.0.jar & 启动成功后,8080端口加入防火墙 请求地址:http://ip:8080/

    springboot整合

    添加依赖 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> 添加配置信息 rocketmq: name-server: 192.168.13.120:9876 producer: group: javasm-produncer

    同步发送对象

    生产者Controller调用 @RestController public class ProducerController { @Autowired RocketMQTemplate rocketMQTemplate; @GetMapping("/send") public String sendMessage(){ //发送同步消息-字符串 //javasmTopic主题名字 rocketMQTemplate.syncSend("javasmTopic","同步发送的字符串"); return "success"; } } 消费者Listener @Component @RocketMQMessageListener(consumerGroup = "javasmConsumerGroup",topic = "javasmTopic") public class ConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println(s); } }

    异步消息

    @GetMapping("/send") public String sendMessage(){ rocketMQTemplate.asyncSend("javasmTopic", "异步发送的字符串", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //发送成功执行 } @Override public void onException(Throwable throwable) { //发送异常执行 } }); return "success"; }

    单向消息

    rocketMQTemplate.sendOneWay("javasmTopic", "单向消息");

    顺序消息

    // hashkey相同的消息会发送到同一个queue rocketMQTemplate.syncSendOrderly("javasmTopic", "顺序消息1","abc"); rocketMQTemplate.syncSendOrderly("javasmTopic", "顺序消息2","abc"); rocketMQTemplate.syncSendOrderly("javasmTopic", "顺序消息3","abc"); // 消费者-Listener也要修改 @RocketMQMessageListener(consumerGroup = "javasmConsumerGroup",topic = "javasmTopic",consumeMode = ConsumeMode.ORDERLY)

    一个broker中有多个queue(单queue天然有序),producer轮询发送至多个queue,不能保证有序。需要保证顺序的消息生产进同一个queue,消费由一个线程消费一个queue。

    顺序消息生产者

    顺序消息消费者

    延时消息

    /** 延时消息一般使用在一定的场景中,比如我们12306买火车票 下完订单之后 30分钟如果没有支付的话 订单自动取消 此时就需要用到延时消息 我们可以在下单的时候发送一个30分钟的延时时间 等到30分钟之后 消费者自动收到通知,收到通知之后,可以查询订单状态 如果没有支付取消订单 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 */ private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; //timeout连接超时的时间 //delayLevel延时的等级 rocketMQTemplate.syncSend("javasmTopic", MessageBuilder.withPayload("延迟消息").build(), 3000, 3);

    批量消息

    List<Message> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { list.add(MessageBuilder.withPayload("批量消息"+i).build()); } rocketMQTemplate.syncSend("javasmTopic",list,4000);

    事务消息

    事务消息的应用场景 张三给李四转红包,怎么保证张三账户余额需要扣减,李四的账户余额需要增加,怎么保证张三账户扣钱后李四账户增加呢

    问题描述: 如果是单系统或者一个数据库 完全可使用本地事务解决,但是在实际当中多个微服务可能是多库多表的 甚至都不在一个工程中,此时需要使用分布式事务 比如说seata,但是RocketMQ分布式支持事务场景

    官网概念 事务消息共有三种状态,提交状态、回滚状态、中间状态: TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。 TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。 TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

    生产者-Controller System.out.println("张三准备给李四转钱"); String string = UUID.randomUUID().toString(); Message<String> message = MessageBuilder.withPayload("准备转钱").setHeader(RocketMQHeaders.TRANSACTION_ID,string).build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transactionMessageTopic",message,null); 本地事务监听器 @Component @RocketMQTransactionListener//注解标注的类 将会监听本地事务的提交情况 public class LocalTransactionListener implements RocketMQLocalTransactionListener { /* * 这个方法表示执行本地事务 这个方法执行时间是向broker发送预处理消息收到回复之后 就会走这个回调函数 * 如果这个本地事务执行提交成功 消费者可以消费 如果回滚 则Broker会回滚消息 如果不返回或者返回的是UNKNOWN * 则 默认情况下预处理消息发送一分钟后 Broker通知Producer 检查本地事务 在checkLocalTransaction返回事务提交情况 * */ private AtomicInteger atomicInteger = new AtomicInteger(); private Map<String, Integer> map = new HashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); int value = atomicInteger.getAndIncrement() % 3; map.put(transactionId, value); if (value == 0) { //提交成功 System.out.println("张三余额扣除成功"); System.out.println(transactionId + "提交成功"); return RocketMQLocalTransactionState.COMMIT; } if (value == 1) { System.out.println("张三余额扣除失败"); System.out.println(transactionId + "事务回滚"); return RocketMQLocalTransactionState.ROLLBACK; } System.out.println("张三余额没啥动静"); System.out.println(transactionId + "事务无响应"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); Integer integer = map.get(transactionId); if (integer == 2) { System.out.println(transactionId + "回查时提交成功"); return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } } 消费者-Listener @Component @RocketMQMessageListener(consumerGroup = "javasmConsumerGroup",topic = "transactionMessageTopic") public class ConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println(s); System.out.println("李四账户添加金额"); } }

    消息过滤

    消息过滤: 表示消息提供者发了很多个消息,但是我只想消费其中的某一部分消息 这就是消息过滤 。 消息过滤的方式 :

    设置TagSQL表达式 (只有使用push模式的消费者才能用使用SQL92标准的sql语句)

    RocketMQ集群

    RocketMq 集群方式搭建 步骤教学包教包会

    实战场景(逾期订单的状态修改)

    第一种方式:设置定时任务遍历。费时费力,而且存在修改延迟。 第二种方式:使用redis的过期时间处理。利用Redis的setex以及订阅/发布功能实现逾期订单的状态修改,存在消息丢失。 第三种方式:使用RocketMQ(有ack机制,防消息丢失,消息至少会被投递一次。)

    使用RocketMQ的延时队列机制

    实战场景(rpc下的异步处理)

    RocketMQ订单支付场景

    Processed: 0.010, SQL: 8