天天看點

mqtt session保持 訂閱消息_java – 如何檢查訂閱者是否有效接受在MQTT上為已釋出主題接收的消息...

我是MQTT的新手.

我在java中實作MQTT,我使用下面的代碼讓釋出者釋出到特定主題,

public void publish()

{

MqttClient myClient = null;

MqttConnectOptions connOpt;

try {

// Subscription with Brokers

connOpt = new MqttConnectOptions();

connOpt.setAutomaticReconnect(true);

connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.

String clientID = UUID.randomUUID().toString().replace("-", "");

System.out.println("clientID " + clientID);

myClient = new MqttClient("tcp://192.168.10.500:1883", clientID);

myClient.connect(connOpt);

String myTopic = "Device1";

MqttTopic topic = myClient.getTopic(myTopic);

int pubQoS = 0;

MqttMessage message = new MqttMessage("mqttMessage".getBytes());

message.setQos(pubQoS);

message.setRetained(false);

MqttDeliveryToken token = null;

token = topic.publish(message);

System.out.println("publish successful with the message :: " + message);

// Wait until the message has been delivered to the broker

token.waitForCompletion();

} catch (MqttException me) {

} catch (Exception e) {

}

}

然後我使用下面的代碼來讀取作為訂閱者的特定主題的已釋出消息,

public void subscribe()

{

try {

MqttConnectOptions connOpt;

// Subscription with mqttBrokerEndPoint

connOpt = new MqttConnectOptions();

connOpt.setAutomaticReconnect(true);

connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.

String clientID = UUID.randomUUID().toString().replace("-", "");

System.out.println("clientID " + clientID);

MqttSubscriber mqttConnection = new MqttSubscriber();

myClient = new MqttClient("tcp://192.168.10.500:1883" clientID);

myClient.setCallback(mqttConnection);

myClient.connect(connOpt);

myClient.subscribe("Device1");

} catch (MqttException e) {

}

}

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

try {

System.out.println(message);

boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"

//if(isValidClient) {

if(message != null) {

System.out.println("message" + message.toString());

}

myClient.unsubscribe("Device1");

myClient.disconnect();

//}

}

catch(Exception e){}

}

上面的實作工作正常.

由于我對mqtt很新,我對上述實作有一些疑問.

1)釋出者和訂閱者中的用戶端ID對于一個特定流是否應該是相同的?

或者如上所述,釋出者和訂閱者應該不同:哪些可以随機生成?

String clientID = UUID.randomUUID().toString().replace("-", "");

這個随機生成的clientID在訂閱和釋出方面都運作良好.

但是,如果我為釋出者和訂閱者使用相同的用戶端并驗證訂閱者?

我的意思是使用“clientID”myClient = new MqttClient(mqttBrokerEndPoint,“clientID”);在subsriber然後相同的“clientID”myClient = new MqttClient(mqttBrokerEndPoint,“clientID”);在出版商

我在mqtt代理控制台(使用的Windows版本)中收到以下套接字錯誤,

1549414715: Socket error on client 82, disconnecting.

1549414715: New client connected from 192.168.10.500 as clientID (c1, k60).

1549414715: No will message specified.

1549414715: Sending CONNACK to 82 (0, 0)

1549414716: New connection from 192.168.10.500 on port 1883.

1549414716: Client 82 already connected, closing old connection.

1549414716: Socket error on client 82, disconnecting.

1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).

1549414716: No will message specified.

1549414716: Sending CONNACK to 82 (0, 0)

1549414716: New connection from 192.168.10.500 on port 1883.

1549414716: Client 82 already connected, closing old connection.

1549414716: Socket error on client 82, disconnecting.

1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).

1549414716: No will message specified.

1549414716: Sending CONNACK to 82 (0, 0)

并且上述程式無效.

我們不能為訂閱者和釋出者使用相同的clientID嗎?為什麼導緻套接字錯誤,程式無效?

2)實施時是否必須使用使用者名和密碼?或者我們可以建立一個沒有以下2個屬性的連接配接?

connOpt.setUserName(USERNAME);

connOpt.setPassword(PASSWORD.toCharArray());

3)pubQoS是否必須用于釋出者?我目前正在使用它為零’0’?

MqttMessage message = new MqttMessage(mqttMessage.getBytes());

message.setQos(0);

message.setRetained(false);

也是保留屬性對于釋出者是強制性的?

4)訂閱時這兩個屬性是否必須屬性?我在代碼中使用如下.

connOpt.setAutomaticReconnect(true);

connOpt.setCleanSession(true);//if your not setting cleanSession to

false然後不應該保留訂閱.

5)此外,一旦從MQTT釋出者收到消息到MessageArrived回調,如下所示,如何驗證這是否是有效的訂閱者并繼續進一步的邏輯?

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

try {

System.out.println(message);

boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"

//if valid subscriber only i need to read message publised on the topic "Device1"

**//if(isValidClient) {**

if(message != null) {

System.out.println("message" + message.toString());

}

myClient.unsubscribe("Device1");

myClient.disconnect();

//}

}

catch(Exception e){}

我的意思是MQTT API檢查該消息是僅為該訂戶發送的屬性,他可以繼續使用消息到達回調中收到的消息?

即,MQTT api的哪個屬性可用于檢查收到的消息是否與目前的PROCESS / STEP相關.(在SUbscriber Program中,如下所示)

IS主題是訂閱者和釋出者之間唯一的共同屬性,用于在messageArrived回調中驗證訂閱者?或者我們是否有任何其他共同屬性來檢查訂閱者和釋出者之間的有效合同?

或者我們應該使用clientID來驗證訂戶?但是,如果我為訂閱者和釋出者使用相同的clientID,我将收到我在第1點提到的套接字錯誤.

怎麼進一步這個?