springboot集成MQTT实现消息收发,断线重连

    科技2022-07-10  113

    springboot集成MQTT实现消息收发,断线重连

    springboot中集成netty。我在代码里用到了lombok的@Slf4j注解输出日志日志。

    mqtt配置

    配置 host 里的ip换成自己服务器的公网ip mqtt服务器如果设置了用户名和密码需要填写,没设置就不需要

    spring: application: name: mqtt profiles: active: local #local:配置的是本地或测试环境 dev:配置的是生产环境 mqtt: # 服务器连接地址,如果有多个,用逗号隔开 host: tcp://ip:1883 # 连接服务器默认客户端ID clientId: mqtt_client_id_001 # 默认的消息推送主题,实际可在调用接口时指定 topic: mqtt_topic_001,mqtt_topic_002,mqtt_topic_003 # 用户名 username: # 密码 password: # 连接超时 timeout: 30 # 心跳 keepalive: 30

    maven依赖

    <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!--mqtt--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>

    获取配置信息工具类

    可以使用注解 @Value获取配置信息 我使用的是下面这种方式

    import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * 获取配置信息 **/ public class PropertiesUtil { public static String MQTT_HOST; public static String MQTT_CLIENT_ID; public static String MQTT_USER_NAME; public static String MQTT_PASSWORD; public static String MQTT_TOPIC; public static Integer MQTT_TIMEOUT; public static Integer MQTT_KEEP_ALIVE; /** * mqtt配置 */ static { Properties properties = loadMqttProperties(); MQTT_HOST = properties.getProperty("host"); MQTT_CLIENT_ID = properties.getProperty("clientId"); MQTT_USER_NAME = properties.getProperty("username"); MQTT_PASSWORD = properties.getProperty("password"); MQTT_TOPIC = properties.getProperty("topic"); MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout")); MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive")); } private static Properties loadMqttProperties() { InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml"); Properties properties = new Properties(); try { properties.load(inputstream); return properties; } catch (IOException e) { throw new RuntimeException(e); } finally { try { if (inputstream != null) { inputstream.close(); } } catch (IOException e) { throw new RuntimeException(e); } } } }

    mqtt客户端

    import com.common.utils.PropertiesUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttConsumer implements ApplicationRunner { private static MqttClient client; @Override public void run(ApplicationArguments args) { log.info("初始化并启动mqtt......"); this.connect(); } /** * 连接mqtt服务器 */ private void connect() { try { // 1 创建客户端 getClient(); // 2 设置配置 MqttConnectOptions options = getOptions(); String[] topic = PropertiesUtil.MQTT_TOPIC.split(","); // 3 消息发布质量 int[] qos = getQos(topic.length); // 4 最后设置 create(options, topic, qos); } catch (Exception e) { log.error("mqtt连接异常:" + e); } } /** * 创建客户端 --- 1 --- */ public void getClient() { try { if (null == client) { client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence()); } log.info("--创建mqtt客户端"); } catch (Exception e) { log.error("创建mqtt客户端异常:" + e); } } /** * 生成配置对象,用户名,密码等 --- 2 --- */ public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); //options.setUserName(PropertiesUtil.MQTT_USER_NAME); //options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray()); // 设置超时时间 options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT); // 设置会话心跳时间 options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE); // 是否清除session options.setCleanSession(false); log.info("--生成mqtt配置对象"); return options; } /** * qos --- 3 --- */ public int[] getQos(int length) { int[] qos = new int[length]; for (int i = 0; i < length; i++) { /** * MQTT协议中有三种消息发布服务质量: * * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。 * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大 */ qos[i] = 1; } log.info("--设置消息发布质量"); return qos; } /** * 装在各种实例和订阅主题 --- 4 --- */ public void create(MqttConnectOptions options, String[] topic, int[] qos) { try { client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); log.info("--添加回调处理类"); client.connect(options); } catch (Exception e) { log.info("装载实例或订阅主题异常:" + e); } } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { log.info("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布,非持久化 * * qos根据文档设置为1 * * @param topic * @param msg */ public static void publish(String topic, String msg) { publish(1, false, topic, msg); } /** * 发布 */ public static void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { log.error("topic:" + topic + " 不存在"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); if (!token.isComplete()) { log.info("消息发送成功"); } } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } }

    mqtt客户端回调类

    import com.common.utils.PropertiesUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import java.util.Arrays; /** * mqtt回调处理类 */ @Slf4j public class MqttConsumerCallback implements MqttCallbackExtended { private MqttClient client; private MqttConnectOptions options; private String[] topic; private int[] qos; public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) { this.client = client; this.options = options; this.topic = topic; this.qos = qos; } /** * 断开重连 */ @Override public void connectionLost(Throwable cause) { log.info("MQTT连接断开,发起重连......"); try { if (null != client && !client.isConnected()) { client.reconnect(); log.error("尝试重新连接"); } else { client.connect(options); log.error("尝试建立新连接"); } } catch (Exception e) { e.printStackTrace(); } } /** * 接收到消息调用令牌中调用 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { //log.info("deliveryComplete---------" + Arrays.toString(topic)); } /** * 消息处理 */ @Override public void messageArrived(String topic, MqttMessage message) { try { String msg = new String(message.getPayload()); log.info("收到topic:" + topic + " 消息:" + msg); } catch (Exception e) { log.info("处理mqtt消息异常:" + e); } } /** * mqtt连接后订阅主题 */ @Override public void connectComplete(boolean b, String s) { try { if (null != topic && null != qos) { if (client.isConnected()) { client.subscribe(topic, qos); log.info("mqtt连接成功,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID); log.info("--订阅主题::" + Arrays.toString(topic)); } else { log.info("mqtt连接失败,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID); } } } catch (Exception e) { log.info("mqtt订阅主题异常:" + e); } }

    编写测试controller

    @RestController @RequestMapping("/test") public class TestController { /** * 测试推送消息 */ @ResponseBody @GetMapping(value = "/push") public Object push(@Param("topic") String topic, @Param("msg") String msg) { MqttConsumer.publish(topic, msg); return "测试成功"; }

    测试日志

    测试需要搭建MQTT服务器 MQTT服务器搭建地址

    如果文中有问题,欢迎指出,大家一起学习进步。

    Processed: 0.015, SQL: 8