mqtt服务器搭建请看MQTT Android 开发(一)MQTT 介绍及服务器搭建 先上效果图
上图在测试时,我们需要进入webSocket 页面,直接点击连接即可,不用修改参数。演示的功能是向客户端订阅的主题发送消息,以及订阅客户端发送消息的主题,接收客户端的消息。
1.添加依赖
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'此时要注意,如果你使用的开发包是Android X ,有可能会出现这个错误
还需要添加下面这个依赖,解决上述问题,为什么会出现这个问题呢?(~ ̄(OO) ̄)ブ我也忘了。反正出现上面的问题加上就OK了,没问题更好。
implementation 'androidx.localbroadcastmanager:localbroadcastmanager:1.0.0'我使用了viewBinding,还需要在build.gradle(app)文件下的android 节点下添加下面的代码开启它
viewBinding { enabled = true }2.在清单文件中添加必要的权限
<uses-permission android:name="android.permission.WAKE_LOCK" /> <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> <uses-permission android:name="android.permission.READ_PHONE_STATE" /> <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" /> <uses-permission android:name="android.permission.INTERNET" />3.在清单文件里开启服务
<service android:name="org.eclipse.paho.android.service.MqttService" />4.具体代码如下
public class MainActivity extends AppCompatActivity implements View.OnClickListener { private static final String TAG = "MainActivity"; // MQTT 客户端 private MqttAndroidClient client; // 消息代理服务器地址 因为我是本地搭建的 所以只能用模拟器进行测试 10.0.2.2 是Android 规定的访问本地电脑的地址 1883 是MQTT非加密协议端口 private String serverURL = "tcp://10.0.2.2:1883"; // 客户端 ID,用以识别客户端 private String clientId = "HelloWorld"; // 使用ViewBinding 代替 findViewByid 可忽略 private ActivityMainBinding mBinding; // 连接参数设置 private MqttConnectOptions mConnectOptions; // 要订阅的主题 private String mTopic = "forTest"; // 要发送消息的主题 private String mTopic_send = "forSend"; /* * TextView tvLog 是为了展示效果 代替了log * */ @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); mBinding = ActivityMainBinding.inflate(getLayoutInflater()); setContentView(mBinding.getRoot()); initListener(); initConnectOptions(); } private void initConnectOptions() { // 连接设置 mConnectOptions = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 mConnectOptions.setCleanSession(true); // 设置连接的用户名 mConnectOptions.setUserName("Stone"); // 设置连接的密码 mConnectOptions.setPassword("12345".toCharArray()); // 设置连接超时时间 单位为秒 mConnectOptions.setConnectionTimeout(3); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*60秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 mConnectOptions.setKeepAliveInterval(60); // 设置自动重连 mConnectOptions.setAutomaticReconnect(true); // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 mConnectOptions.setWill(mTopic, "close".getBytes(), 2, true); } private void initListener() { mBinding.btnConnect.setOnClickListener(this); mBinding.btnSubscribe.setOnClickListener(this); mBinding.btnDisConnect.setOnClickListener(this); mBinding.btnPublish.setOnClickListener(this); mBinding.btnUnSubscribe.setOnClickListener(this); } @Override public void onClick(View v) { try { switch (v.getId()) { case R.id.btn_connect: // 创建客户端 client = new MqttAndroidClient(this, serverURL, clientId); // 设置回调 client.setCallback(new MqttCallback()); // 开始连接 client.connect(mConnectOptions, null, new MqttListener()); mBinding.tvLog.setText("开始连接\n"); break; case R.id.btn_subscribe: // 订阅主题 subscribeTopic(); break; case R.id.btn_dis_connect: // 断开连接 if (client != null) { client.disconnect(); mBinding.tvLog.append("客户端已断开\n"); } break; case R.id.btn_publish: // 发布消息 publishMessage(); break; case R.id.btn_un_subscribe: // 取消订阅主题 unSubscribe(); break; default: break; } } catch (MqttException e) { } } /** * 连接服务器监听 */ class MqttListener implements IMqttActionListener { @Override public void onSuccess(IMqttToken asyncActionToken) { mBinding.tvLog.append("onSuccess: 连接成功\n"); // TODO: 2020/10/8 连接成功后该干啥干啥 } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { mBinding.tvLog.append("onFailure: 连接失败\n"); } } /** * 订阅主题 * * @throws MqttException */ private void subscribeTopic() throws MqttException { client.subscribe(mTopic, 2, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { mBinding.tvLog.append("onSuccess: 主题" + mTopic + "订阅成功\n"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { mBinding.tvLog.append("onFailure: 主题" + mTopic + "订阅失败\n"); } }); } /** * 取消订阅主题 */ private void unSubscribe() throws MqttException { if (client != null) { client.unsubscribe(mTopic, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { mBinding.tvLog.append("取消订阅主题成功\n"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { mBinding.tvLog.append("取消订阅主题失败\n"); } }); } } /** * 发布消息 * * @throws MqttException */ private void publishMessage() throws MqttException { final String msg = "This is a message"; MqttMessage message = new MqttMessage(); message.setQos(1); message.setPayload(msg.getBytes()); client.publish(mTopic_send, message, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { mBinding.tvLog.append("onSuccess: 消息" + msg + "成功发送\n"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Log.d(TAG, "onFailure: 消息" + msg + "发送失败\n"); } }); } /** * 订阅主题的回调 */ class MqttCallback implements MqttCallbackExtended { @Override public void connectionLost(Throwable cause) { mBinding.tvLog.append("connectionLost: 连接断开,可以做重连操作\n"); } @Override public void messageArrived(String topic, MqttMessage message) { mBinding.tvLog.append("messageArrived: 接收到消息==>" + message + "\n"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { mBinding.tvLog.append("deliveryComplete: 消息成功到达broker\n"); } @Override public void connectComplete(boolean reconnect, String serverURI) { mBinding.tvLog.append("connectComplete: 连接完成\n"); } } }大多情况下,MQTT的逻辑应该放到service里面,这里进行了简化。