另外一個MQTT釋出訂閱用戶端paho-mqtt-client或mqttv3采用回調的方式實作消息的接收,下面看一下實作:
1.消息接收回調類
[java]
view plain
copy
?
- package cn.smartslim.mqtt.demo.paho;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.MqttTopic;
- /**
- * 釋出消息的回調類
- *
- * 必須實作MqttCallback的接口并實作對應的相關接口方法
- * ◦CallBack 類将實作 MqttCallBack。每個客戶機辨別都需要一個回調執行個體。在此示例中,構造函數傳遞客戶機辨別以另存為執行個體資料。在回調中,将它用來辨別已經啟動了該回調的哪個執行個體。
- * ◦必須在回調類中實作三個方法:
- * public void messageArrived(MqttTopic topic, MqttMessage message)
- * 接收已經預訂的釋出。
- * public void connectionLost(Throwable cause)
- * 在斷開連接配接時調用。
- * public void deliveryComplete(MqttDeliveryToken token))
- * 接收到已經釋出的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。
- * ◦由 MqttClient.connect 激活此回調。
- */
- public class PushCallback implements MqttCallback {
- public void connectionLost(Throwable cause) {
- // 連接配接丢失後,一般在這裡面進行重連
- System.out.println("連接配接斷開,可以做重連");
- }
- public void deliveryComplete(MqttDeliveryToken token) {
- // publish後會執行到這裡
- System.out.println("deliveryComplete---------"+ token.isComplete());
- public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
- // subscribe後得到的消息會執行到這裡面
- System.out.println("接收消息主題:"+topic.getName());
- System.out.println("接收消息Qos:"+message.getQos());
- System.out.println("接收消息内容:"+new String(message.getPayload()));
- }
2.服務端釋出消息
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
- import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;
- public class Server {
- public static final String HOST = "tcp://192.168.36.102:1883";
- public static final String TOPIC = "tokudu/yzq124";
- private static final String clientid ="server";
- private MqttClient client;
- private MqttTopic topic;
- private String userName = "test";
- private String passWord = "test";
- private MqttMessage message;
- public Server() throws MqttException {
- //MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
- connect();
- private void connect() {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(false);
- options.setUserName(userName);
- options.setPassword(passWord.toCharArray());
- // 設定逾時時間
- options.setConnectionTimeout(10);
- // 設定會話心跳時間
- options.setKeepAliveInterval(20);
- try {
- client.setCallback(new PushCallback());
- client.connect(options);
- topic = client.getTopic(TOPIC);
- } catch (Exception e) {
- e.printStackTrace();
- }
- public void publish(MqttMessage message) throws MqttPersistenceException, MqttException{
- MqttDeliveryToken token = topic.publish(message);
- token.waitForCompletion();
- System.out.println(token.isComplete()+"========");
- public static void main(String[] args) throws MqttException {
- Server server = new Server();
- server.message = new MqttMessage();
- server.message.setQos(1);
- server.message.setRetained(true);
- server.message.setPayload("eeeeeaaaaaawwwwww---".getBytes());
- server.publish(server.message);
- System.out.println(server.message.isRetained()+"------ratained狀态");
3.用戶端接收消息
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- import org.eclipse.paho.client.mqttv3.MqttSecurityException;
- public class Client {
- private static final String clientid = "client";
- private MqttConnectOptions options;
- private ScheduledExecutorService scheduler;
- //重新連結
- public void startReconnect() {
- scheduler = Executors.newSingleThreadScheduledExecutor();
- scheduler.scheduleAtFixedRate(new Runnable() {
- public void run() {
- if (!client.isConnected()) {
- try {
- client.connect(options);
- } catch (MqttSecurityException e) {
- e.printStackTrace();
- } catch (MqttException e) {
- }
- }
- }
- }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
- private void start() {
- // host為主機名,test為clientid即連接配接MQTT的用戶端ID,一般以用戶端唯一辨別符表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
- // MQTT的連接配接設定
- options = new MqttConnectOptions();
- // 設定是否清空session,這裡如果設定為false表示伺服器會保留用戶端的連接配接記錄,這裡設定為true表示每次連接配接到伺服器都以新的身份連接配接
- options.setCleanSession(true);
- // 設定連接配接的使用者名
- options.setUserName(userName);
- // 設定連接配接的密碼
- options.setPassword(passWord.toCharArray());
- // 設定逾時時間 機關為秒
- options.setConnectionTimeout(10);
- // 設定會話心跳時間 機關為秒 伺服器會每隔1.5*20秒的時間向用戶端發送個消息判斷用戶端是否線上,但這個方法并沒有重連的機制
- options.setKeepAliveInterval(20);
- // 設定回調
- client.setCallback(new PushCallback());
- MqttTopic topic = client.getTopic(TOPIC);
- //setWill方法,如果項目中需要知道用戶端是否掉線可以調用該方法。設定最終端口的通知消息
- options.setWill(topic, "close".getBytes(), 0, true);
- client.connect(options);
- //訂閱消息
- int[] Qos = {1};
- String[] topic1 = {TOPIC};
- client.subscribe(topic1, Qos);
- e.printStackTrace();
- public void disconnect() {
- try {
- client.disconnect();
- } catch (MqttException e) {
- public static void main(String[] args) throws MqttException {
- Client client = new Client();
- client.start();