文章目录
什么是MQ为何用消息队列linux下安装RocketMQRocketMQ监控平台springboot整合同步发送对象异步消息单向消息顺序消息延时消息批量消息事务消息消息过滤
RocketMQ集群实战场景(逾期订单的状态修改)实战场景(rpc下的异步处理)
什么是MQ
消息队列(Message Queue),是一种应用程序对应用程序的通信方法,是分布式系统的重要组件。
为何用消息队列
可以解决一些应用场景的高并发问题。 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用MQ来处理。 消息队列在实际应用中包括如下四个场景
解除耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;削峰填谷:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理。
linux下安装RocketMQ
安装jdk安装maven
解压缩
tar xvf apache-maven-3.6.0-bin.tar.gz
移动文件夹到/usr/local目录下
mv apache-maven-3.6.0 /usr/local/
配置maven环境变量
vim /etc/profile
文件末尾追加:
export MAVEN_HOME=/usr/local/apache-maven-3.6.0
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin
刷新配置文件
source /etc/profile
修改配置文件setting.xml
<mirror>
<id>aliyun
</id>
<mirrorOf>central
</mirrorOf>
<name>aliyun maven
</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/
</url>
</mirror>
安装Git
yum -y install git (直接上传压缩包可以不用安装git)
下载RockeMQ源码
git clone -b release-4.1.0-incubating https://github.com/apache/incubator-rocketmq.git
cd incubator-rocketmq
此步骤可以文件上传
解压源码
如果没有安装unzip 执行:yum -y install unzip
unzip rocketmq-all-4.3.0-source-release.zip
移动到/user/local目录下
编译
在rocket目录下执行
mvn -Prelease-all -DskipTests clean install -U
启动
到编译之后的路径下
cd distribution/target/apache-rocketmq
先修改rocket内存大小,默认虚拟机内存不足以启动
修改bin目录下文件 runbroker.sh 修改内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
修改bin目录下文件 runserver.sh 修改内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
关闭防火墙
systemctl stop firewalld
后台启动Name Server
nohup sh bin/mqnamesrv &
查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
后台启动Broker
nohup sh bin/mqbroker -n 192.168.13.145:9876 autoCreateTopicEnable=true &
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
测试
> export NAMESRV_ADDR=localhost:9876
##发送消息
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
##接收消息
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
停止
##后启动的先停止
> sh bin/mqshutdown broker
> sh bin/mqshutdown namesrv
RocketMQ监控平台
解压
unzip rocketmq-externals-master.zip
#移动文件夹到/user/local下
>mv rocketmq-externals-master /usr/local/
#进入文件夹
>/usr/local/rocketmq-externals-master/rocketmq-console/src/main/resources
#修改配置文件
>vim application.properties
#进入文件夹
> cd /usr/local/rocketmq-externals-master/rocketmq-console/
执行编译指令
> mvn clean package -Dmaven.test.skip=true
编译成功之后会生成一个jar文件
启动指令
> java -jar target/rocketmq-console-ng-1.0.0.jar &
启动成功后,8080端口加入防火墙
请求地址:http://ip:8080/
springboot整合
添加依赖
<dependency>
<groupId>org.apache.rocketmq
</groupId>
<artifactId>rocketmq-spring-boot-starter
</artifactId>
<version>2.1.0
</version>
</dependency>
添加配置信息
rocketmq:
name-server: 192.168.13.120
:9876
producer:
group: javasm
-produncer
同步发送对象
生产者Controller调用
@RestController
public class ProducerController {
@Autowired
RocketMQTemplate rocketMQTemplate
;
@GetMapping("/send")
public String
sendMessage(){
rocketMQTemplate
.syncSend("javasmTopic","同步发送的字符串");
return "success";
}
}
消费者Listener
@Component
@RocketMQMessageListener(consumerGroup
= "javasmConsumerGroup",topic
= "javasmTopic")
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s
) {
System
.out
.println(s
);
}
}
异步消息
@GetMapping("/send")
public String
sendMessage(){
rocketMQTemplate
.asyncSend("javasmTopic", "异步发送的字符串", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult
) {
}
@Override
public void onException(Throwable throwable
) {
}
});
return "success";
}
单向消息
rocketMQTemplate
.sendOneWay("javasmTopic", "单向消息");
顺序消息
rocketMQTemplate
.syncSendOrderly("javasmTopic", "顺序消息1","abc");
rocketMQTemplate
.syncSendOrderly("javasmTopic", "顺序消息2","abc");
rocketMQTemplate
.syncSendOrderly("javasmTopic", "顺序消息3","abc");
@RocketMQMessageListener(consumerGroup
= "javasmConsumerGroup",topic
= "javasmTopic",consumeMode
= ConsumeMode
.ORDERLY
)
一个broker中有多个queue(单queue天然有序),producer轮询发送至多个queue,不能保证有序。需要保证顺序的消息生产进同一个queue,消费由一个线程消费一个queue。
顺序消息生产者
顺序消息消费者
延时消息
private String messageDelayLevel
= "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
rocketMQTemplate
.syncSend("javasmTopic", MessageBuilder
.withPayload("延迟消息").build(), 3000, 3);
批量消息
List
<Message> list
= new ArrayList<>();
for (int i
= 0; i
< 10; i
++) {
list
.add(MessageBuilder
.withPayload("批量消息"+i
).build());
}
rocketMQTemplate
.syncSend("javasmTopic",list
,4000);
事务消息
事务消息的应用场景 张三给李四转红包,怎么保证张三账户余额需要扣减,李四的账户余额需要增加,怎么保证张三账户扣钱后李四账户增加呢
问题描述: 如果是单系统或者一个数据库 完全可使用本地事务解决,但是在实际当中多个微服务可能是多库多表的 甚至都不在一个工程中,此时需要使用分布式事务 比如说seata,但是RocketMQ分布式支持事务场景
官网概念 事务消息共有三种状态,提交状态、回滚状态、中间状态: TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。 TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。 TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
生产者-Controller
System
.out
.println("张三准备给李四转钱");
String string
= UUID
.randomUUID().toString();
Message
<String> message
= MessageBuilder
.withPayload("准备转钱").setHeader(RocketMQHeaders
.TRANSACTION_ID
,string
).build();
TransactionSendResult result
= rocketMQTemplate
.sendMessageInTransaction("transactionMessageTopic",message
,null
);
本地事务监听器
@Component
@RocketMQTransactionListener
public class LocalTransactionListener implements RocketMQLocalTransactionListener {
private AtomicInteger atomicInteger
= new AtomicInteger();
private Map
<String, Integer> map
= new HashMap<>();
@Override
public RocketMQLocalTransactionState
executeLocalTransaction(Message message
, Object o
) {
String transactionId
= (String
) message
.getHeaders().get(RocketMQHeaders
.TRANSACTION_ID
);
int value
= atomicInteger
.getAndIncrement() % 3;
map
.put(transactionId
, value
);
if (value
== 0) {
System
.out
.println("张三余额扣除成功");
System
.out
.println(transactionId
+ "提交成功");
return RocketMQLocalTransactionState
.COMMIT
;
}
if (value
== 1) {
System
.out
.println("张三余额扣除失败");
System
.out
.println(transactionId
+ "事务回滚");
return RocketMQLocalTransactionState
.ROLLBACK
;
}
System
.out
.println("张三余额没啥动静");
System
.out
.println(transactionId
+ "事务无响应");
return RocketMQLocalTransactionState
.UNKNOWN
;
}
@Override
public RocketMQLocalTransactionState
checkLocalTransaction(Message message
) {
String transactionId
= (String
) message
.getHeaders().get(RocketMQHeaders
.TRANSACTION_ID
);
Integer integer
= map
.get(transactionId
);
if (integer
== 2) {
System
.out
.println(transactionId
+ "回查时提交成功");
return RocketMQLocalTransactionState
.COMMIT
;
}
return RocketMQLocalTransactionState
.ROLLBACK
;
}
}
消费者-Listener
@Component
@RocketMQMessageListener(consumerGroup
= "javasmConsumerGroup",topic
= "transactionMessageTopic")
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s
) {
System
.out
.println(s
);
System
.out
.println("李四账户添加金额");
}
}
消息过滤
消息过滤: 表示消息提供者发了很多个消息,但是我只想消费其中的某一部分消息 这就是消息过滤 。 消息过滤的方式 :
设置TagSQL表达式 (只有使用push模式的消费者才能用使用SQL92标准的sql语句)
RocketMQ集群
RocketMq 集群方式搭建 步骤教学包教包会
实战场景(逾期订单的状态修改)
第一种方式:设置定时任务遍历。费时费力,而且存在修改延迟。 第二种方式:使用redis的过期时间处理。利用Redis的setex以及订阅/发布功能实现逾期订单的状态修改,存在消息丢失。 第三种方式:使用RocketMQ(有ack机制,防消息丢失,消息至少会被投递一次。)
使用RocketMQ的延时队列机制
实战场景(rpc下的异步处理)
RocketMQ订单支付场景