描述
困擾我半年的一個問題,我訂閱mqtt主題的消息,運作一段時間以後就會不行,檢視日志,就是不斷地重連成功,進入失敗回調函數中,然後又連接配接成功,又失敗,就是無法恢複正常;
這種頻繁連結成功,然後失敗的原因是設定的連接配接逾時時間太短了,我之前設定的是1;現在改成60;
options.setConnectionTimeout(50);
問題一直在那裡,你不花心思去解決是不可能的,随着時間的增長,帶來的負面影響非常巨大
思路
1、訂閱主題消息
2、斷開網絡,再重連網絡
3、檢視這個時候程式是否能夠接受資料;反複測試
主程式
package Mqtt;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* MQTT用戶端訂閱消息類
*
* @author zhongyulin
*/
@NoArgsConstructor
@Data
public class MqttSource extends RichParallelSourceFunction<String> {
//阻塞隊列存儲訂閱的消息
public static BlockingQueue<String> queue;
public Logger logger = LoggerFactory.getLogger(this.getClass());
private String topic;
public static MqttClient client;
private String mqttt_pre;
public MqttSource(String topic, String mqttt_pre) {
this.mqttt_pre = mqttt_pre;
this.topic = topic;
}
/**
* mqtt連接配接配置
*
* @return
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("Tgdg_2022".toCharArray());
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(50);
options.setKeepAliveInterval(60);
return options;
}
/**
* mqtt連接配接伺服器以及重連代碼
*/
public void Reconnect(String topic) {
try {
client = new MqttClient("tcp://11.11.1.170:1883", System.currentTimeMillis() + new Random().nextLong() + "_", new MemoryPersistence());
client.connect(new MqttSource().getOptions());
if (client.isConnected()) {
client.subscribe(topic, 0);
client.setCallback(new MqttCallBack(topic));
} else {
if (client != null && client.isConnected()) {
client.disconnect();
client.close();
}
if (client != null) {
client.close();
}
}
} catch (MqttException e) {
// 處理連接配接逾時的情況,例如重連
logger.error("mqtt Reconnect fail,into MqttException!");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
//重新連結
Reconnect(topic);
}
}
@Override
public void open(Configuration parameters) {
Reconnect(topic);
}
//flink線程啟動函數
@Override
public void run(SourceContext<String> ctx) throws Exception {
queue = new SynchronousQueue<>(false);
//利用死循環使得程式一直監控主題是否有新消息
while (true) {
//使用阻塞隊列的好處是隊列空的時候程式會一直阻塞到這裡不會浪費CPU資源
ctx.collect(queue.take());
}
}
@Override
public void cancel() {
}
}
失敗回調函數
package Mqtt;
import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class MqttCallBack implements MqttCallback {
private static Logger logger = LoggerFactory.getLogger(MqttCallBack.class);
//訂閱的主題消息
private String topic;
public MqttCallBack(String topic) {
this.topic = topic;
}
//連接配接失敗回調該函數
@Override
public void connectionLost(Throwable throwable) {
try {
logger.error("mqtt into MqttCallBack connectionLost function!");
if (MqttSource.client != null && MqttSource.client.isConnected()) {
MqttSource.client.disconnect();
MqttSource.client.close();
}
if (MqttSource.client != null) {
MqttSource.client.close();
}
TimeUnit.SECONDS.sleep(10);
logger.error("MqttCallBack try to Reconnect!");
new MqttSource().Reconnect(topic);
} catch (Exception e) {
e.printStackTrace();
if (e instanceof MqttException) {
logger.error("MqttCallBack into to MqttException!");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
logger.error("MqttCallBack into to MqttException,begin to Reconnect!");
new MqttSource().Reconnect(topic);
}
}
}
//收到消息回調該函數
@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
JSONObject object = null;
String msg = "";
try {
msg = new String(message.getPayload());
} catch (NullPointerException e) {
logger.error("messageArrived NullPointerException");
return; // 提前退出方法
}
try {
MqttSource.queue.put(msg);
} catch (InterruptedException e) {
logger.error("messageArrived InterruptedException" + e.getMessage());
return; // 提前退出方法
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
總結問題所在
1、每次建立mqtt連接配接的時候,判斷是否連接配接上,若連接配接上就訂閱主題,必須訂閱主題
2、每次抛異常MqttException這裡,需要休眠幾秒,然後重連;我之前都是直接try catch;
try catch隻是捕獲異常,異常是需要處理的,這個時候mqtt已經出問題了,不正常的,是以後面一直都是不正常的;
3、很多人在回調函數的失敗重連機制這裡,使用死循環,如果你抛錯,直接進入MqttException裡面,你在異常這裡不處理,隻會不斷地列印錯誤日志