天天看点

mqtt session保持 订阅消息_java – 如何检查订阅者是否有效接受在MQTT上为已发布主题接收的消息...

我是MQTT的新手.

我在java中实现MQTT,我使用下面的代码让发布者发布到特定主题,

public void publish()

{

MqttClient myClient = null;

MqttConnectOptions connOpt;

try {

// Subscription with Brokers

connOpt = new MqttConnectOptions();

connOpt.setAutomaticReconnect(true);

connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.

String clientID = UUID.randomUUID().toString().replace("-", "");

System.out.println("clientID " + clientID);

myClient = new MqttClient("tcp://192.168.10.500:1883", clientID);

myClient.connect(connOpt);

String myTopic = "Device1";

MqttTopic topic = myClient.getTopic(myTopic);

int pubQoS = 0;

MqttMessage message = new MqttMessage("mqttMessage".getBytes());

message.setQos(pubQoS);

message.setRetained(false);

MqttDeliveryToken token = null;

token = topic.publish(message);

System.out.println("publish successful with the message :: " + message);

// Wait until the message has been delivered to the broker

token.waitForCompletion();

} catch (MqttException me) {

} catch (Exception e) {

}

}

然后我使用下面的代码来读取作为订阅者的特定主题的已发布消息,

public void subscribe()

{

try {

MqttConnectOptions connOpt;

// Subscription with mqttBrokerEndPoint

connOpt = new MqttConnectOptions();

connOpt.setAutomaticReconnect(true);

connOpt.setCleanSession(true);//if your not setting cleanSession to false then subscriptions shouldn't be persisted.

String clientID = UUID.randomUUID().toString().replace("-", "");

System.out.println("clientID " + clientID);

MqttSubscriber mqttConnection = new MqttSubscriber();

myClient = new MqttClient("tcp://192.168.10.500:1883" clientID);

myClient.setCallback(mqttConnection);

myClient.connect(connOpt);

myClient.subscribe("Device1");

} catch (MqttException e) {

}

}

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

try {

System.out.println(message);

boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"

//if(isValidClient) {

if(message != null) {

System.out.println("message" + message.toString());

}

myClient.unsubscribe("Device1");

myClient.disconnect();

//}

}

catch(Exception e){}

}

上面的实现工作正常.

由于我对mqtt很新,我对上述实现有一些疑问.

1)发布者和订阅者中的客户端ID对于一个特定流是否应该是相同的?

或者如上所述,发布者和订阅者应该不同:哪些可以随机生成?

String clientID = UUID.randomUUID().toString().replace("-", "");

这个随机生成的clientID在订阅和发布方面都运行良好.

但是,如果我为发布者和订阅者使用相同的客户端并验证订阅者?

我的意思是使用“clientID”myClient = new MqttClient(mqttBrokerEndPoint,“clientID”);在subsriber然后相同的“clientID”myClient = new MqttClient(mqttBrokerEndPoint,“clientID”);在出版商

我在mqtt代理控制台(使用的Windows版本)中收到以下套接字错误,

1549414715: Socket error on client 82, disconnecting.

1549414715: New client connected from 192.168.10.500 as clientID (c1, k60).

1549414715: No will message specified.

1549414715: Sending CONNACK to 82 (0, 0)

1549414716: New connection from 192.168.10.500 on port 1883.

1549414716: Client 82 already connected, closing old connection.

1549414716: Socket error on client 82, disconnecting.

1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).

1549414716: No will message specified.

1549414716: Sending CONNACK to 82 (0, 0)

1549414716: New connection from 192.168.10.500 on port 1883.

1549414716: Client 82 already connected, closing old connection.

1549414716: Socket error on client 82, disconnecting.

1549414716: New client connected from 192.168.10.500 as clientID (c1, k60).

1549414716: No will message specified.

1549414716: Sending CONNACK to 82 (0, 0)

并且上述程序无效.

我们不能为订阅者和发布者使用相同的clientID吗?为什么导致套接字错误,程序无效?

2)实施时是否必须使用用户名和密码?或者我们可以建立一个没有以下2个属性的连接?

connOpt.setUserName(USERNAME);

connOpt.setPassword(PASSWORD.toCharArray());

3)pubQoS是否必须用于发布者?我目前正在使用它为零’0’?

MqttMessage message = new MqttMessage(mqttMessage.getBytes());

message.setQos(0);

message.setRetained(false);

也是保留属性对于发布者是强制性的?

4)订阅时这两个属性是否必须属性?我在代码中使用如下.

connOpt.setAutomaticReconnect(true);

connOpt.setCleanSession(true);//if your not setting cleanSession to

false然后不应该保留订阅.

5)此外,一旦从MQTT发布者收到消息到MessageArrived回调,如下所示,如何验证这是否是有效的订阅者并继续进一步的逻辑?

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

try {

System.out.println(message);

boolean isValidClient = true;// Here i need to check if this is the valid subscriber for the message publised on topic "Device1"

//if valid subscriber only i need to read message publised on the topic "Device1"

**//if(isValidClient) {**

if(message != null) {

System.out.println("message" + message.toString());

}

myClient.unsubscribe("Device1");

myClient.disconnect();

//}

}

catch(Exception e){}

我的意思是MQTT API检查该消息是仅为该订户发送的属性,他可以继续使用消息到达回调中收到的消息?

即,MQTT api的哪个属性可用于检查收到的消息是否与当前的PROCESS / STEP相关.(在SUbscriber Program中,如下所示)

IS主题是订阅者和发布者之间唯一的共同属性,用于在messageArrived回调中验证订阅者?或者我们是否有任何其他共同属性来检查订阅者和发布者之间的有效合同?

或者我们应该使用clientID来验证订户?但是,如果我为订阅者和发布者使用相同的clientID,我将收到我在第1点提到的套接字错误.

怎么进一步这个?