天天看點

mqtt session保持 訂閱消息_paho的mqtt基本情況以及重連以及重新訂閱機制設定

先講一下paho的mqtt連接配接的java實作情況

1、paho的mqtt底層是采用三個線程進行異步的消息發送、處理和接收的【debug的時候可以看到三個線程】,然後比較坑的是,在處理消息的時候,如果有運作是異常抛出但是沒有處理的話,整個mqtt用戶端直接斷開連接配接。

2、然後就是底層paho提供了兩個用戶端連接配接實作——MqttClient和MqttAsyncClient。前者是同步的,後者是異步的,主要是把連接配接建立等耗時操作進行異步處理,一般使用方式為

3、最後mqtt的對于消息的處理是采用回調的方式,同時,對于收發消息可以采用注冊監聽器的方式進行進度的監聽,具體使用可以參看paho項目的GitHub上的示例,上面有三個比較全的示例4、關于MqttClientPersistence底下的兩個類MemoryPersistence和MqttDefaultFilePersistence,主要是為了消息傳送過程中的一個臨時緩存,如Qos為1,2的消息

重連的思路

針對mqtt協定的原本用途——低網絡品質環境,重連是必須的。目前的話重連有幾種思路

1、在回調函數裡面設定當mqtt用戶端連接配接丢失時重新連接配接

2、在連接配接參數裡面設定重連方法org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(boolean),個人推薦使用第二種方式

下面附上自己的采用第一種方式設定的mqtt用戶端以及回調類

import java.io.UnsupportedEncodingException;

import java.util.List;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttSecurityException;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttClient {

String clientId;

private MemoryPersistence persistence = new MemoryPersistence();

// Private instance variables

private MqttClient client;

private String brokerUrl;

private MqttConnectOptions conOpt;

private boolean clean;

private String password;

private String userName;

//需要重新訂閱的主題

private List topicList;

public MyMqttClient() {

super();

}

public MyMqttClient(String brokerUrl, String clientId, boolean cleanSession, String userName, String password)

throws MqttException {

super();

this.brokerUrl = brokerUrl;

this.clientId = clientId;

this.clean = cleanSession;

this.password = password;

this.userName = userName;

// 建立mqtt連接配接屬性

this.conOpt = new MqttConnectOptions();

this.conOpt.setConnectionTimeout(60);

//this.conOpt.setKeepAliveInterval(60);

this.conOpt.setCleanSession(true);

// 初始化用戶端

this.client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());

this.client.setCallback(new MyMqttCallback(this));

}

public List getTopicList() {

return topicList;

}

public void setTopicList(List topicList) {

this.topicList = topicList;

}

public void connect() {

try {

if (!this.client.isConnected()) {

this.client.connect(this.conOpt);

}

} catch (MqttException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

public void subscribe(String topicName, int qos) {

try {

this.client.subscribe(topicName, qos);

} catch (MqttException e) {

e.printStackTrace();

}

}

public void publish(String topicName, String message, int qos) {

try {

MqttMessage mqttMessage = new MqttMessage();

mqttMessage.setQos(qos);

mqttMessage.setPayload(message.getBytes("utf-8"));

this.client.publish(topicName, mqttMessage);

} catch (MqttException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

public void close() {

try {

this.client.disconnect();

this.client.close();

} catch (MqttException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

public void reConnect() throws MqttSecurityException, MqttException {

if (null != this.client) {

if(!this.client.isConnected()) {

client.connect(this.conOpt);

}else {

this.client.disconnect();

this.client.connect(this.conOpt);

}

}

}

}

import java.sql.CallableStatement;

import java.sql.Connection;

import java.sql.SQLException;

import java.util.List;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.example.util.Config;

import com.example.util.HikariCPUtil;

public class MyMqttCallback implements MqttCallback {

private static final Logger logger = LoggerFactory.getLogger(MyMqttCallback.class);

private static final ExecutorService pool = Executors.newFixedThreadPool(5);

private MyMqttClient myClient;

public MyMqttCallback(MyMqttClient myClient) {

super();

this.myClient = myClient;

}

@Override

public void connectionLost(Throwable cause) {

logger.error("連接配接丢失,原因{}",cause);

// 連接配接丢失後,一般在這裡面進行重連

while (true) {

try {

Thread.sleep(30000);

myClient.reConnect();

List topicList = this.myClient.getTopicList();

for (String topic : topicList) {

this.myClient.subscribe(topic, Config.QOS);

}

logger.info("mqtt重新連接配接,重新訂閱!");

break;

} catch (Exception e) {

e.printStackTrace();

continue;

}

}

}

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

//消息處理

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

// TODO Auto-generated method stub

}

}

關于重連之後的主題重新

主題重新訂閱這個目前主要有兩種實作方式,具體看需求

1、設定連接配接屬性的MqttConnectOptions.setCleanSession(false),然後設定mqtt用戶端的主題固定,重連上之後之前的主題保留,這個和mqtt的broker關系比較大

2、采用MqttCallbackExtended這個回調類,在org.eclipse.paho.client.mqttv3.MqttCallbackExtended.connectComplete(boolean, String)這個方法裡面實作主題的重新訂閱,這個一般結合org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(boolean)使用

3、像我上面的例子一樣,在包裝類裡面緩存之前的topic,在短信重連成功的代碼裡面進行重新訂閱即可

最後

代碼都是一步步晚上,不要想着拿着我的代碼就去用,能用,不保證會不會出什麼bug的