重要參考文章來源:http://gigi.nullneuron.net/gigilabs/resilient-connections-with-rabbitmq-net-client/
參考代碼:https://bitbucket.org/dandago/gigilabs/src/bba0d457869f3283fa9f47a52e9bc009f29afc9d/ResilientRabbitMqConnectivity/?at=master
原因是這樣的,我在Windows用戶端有一個Windows背景服務,負責與服務端的資料互動,資料上傳及資料下載下傳
1.資料上傳部分是使用的rabbitmq donnet庫發送消息至RabbittMQ伺服器,伺服器另外有一個應用程式會監控RabbitMQ伺服器的指定隊列,完成資料的上傳服務
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iN1gTM2IzNxkTMtAzM5YzM2IDMxETM1ATOxAjMtITM2kTOy8CX1ATOxAjMvwlMxYTO5IzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
這樣的架構還是比較好用的,使用的效果目前還行,但遇到一個比較頭痛的問題,Windows背景服務一直在Windows平闆電腦上運作,除非手動安裝及更新應用時,才會将Windows服務進行重新安裝或重新開機,其他的情況是不會進行重新開機的
了解到RabbitMQ是有自動重連的技術的,可以參考位址:https://yq.aliyun.com/articles/369969
這個效果隻作用于,伺服器沒有挂掉,隻是中間有一些網絡問題時才可以進行重連
但有一種情況是沒有處理到的,我們已經在用戶端對RabbitMQ某個隊列進行監控,但伺服器突然挂掉,然後幾分鐘後重新啟動了,這時,用戶端可以重建立立連接配接,但卻不會自動對隊列産生監控,無法拿到消息
現時對代碼做出一些處理
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System.Threading;
using RabbitMQ.Client.Events;
using System.IO;
namespace MES_MonitoringService.Common
{
public class RabbitMQClientHandler
{
private static string defaultRabbitMQHostName = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerHostName");
private static string defaultRabbitMQPort = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerPort");
private static string defaultRabbitMQUserName = Common.ConfigFileHandler.GetAppConfig("RabbitMQUserName");
private static string defaultRabbitMQPassword = Common.ConfigFileHandler.GetAppConfig("RabbitMQPassword");
private static string defaultRabbitVirtualHost = Common.ConfigFileHandler.GetAppConfig("RabbitMQVirtualHost");
// 定義一個靜态變量來儲存類的執行個體
private static RabbitMQClientHandler uniqueInstance;
//定義一個辨別確定線程同步
private static readonly object locker = new object();
/*-------------------------------------------------------------------------------------*/
//ConnectionFactory
private static ConnectionFactory mc_ConnectionFactory = null;
//Connection
private static IConnection Connection;
//發送頻道及接收頻道分開,避免互相影響,導緻整個服務不可用
//Send Channel
private static IModel SendChannel;
//Listen Channel
private static IModel ListenChannel;
//資料監控隊列
private static string MC_SyncDataConsume = string.Empty;
//
//private SyncDataHandler syncDataHandlerClass;
/*-------------------------------------------------------------------------------------*/
/// <summary>
/// 定義私有構造函數,使外界不能建立該類執行個體
/// </summary>
public RabbitMQClientHandler()
{
Reconnect();
}
/// <summary>
/// 定義公有方法提供一個全局通路點,同時你也可以定義公有屬性來提供全局通路點
/// </summary>
/// <returns></returns>
public static RabbitMQClientHandler GetInstance()
{
// 當第一個線程運作到這裡時,此時會對locker對象 "加鎖",
// 當第二個線程運作該方法時,首先檢測到locker對象為"加鎖"狀态,該線程就會挂起等待第一個線程解鎖
// lock語句運作完之後(即線程運作完之後)會對該對象"解鎖"
// 雙重鎖定隻需要一句判斷就可以了
if (uniqueInstance == null)
{
lock (locker)
{
// 如果類的執行個體不存在則建立,否則直接傳回
if (uniqueInstance == null)
{
uniqueInstance = new RabbitMQClientHandler();
}
}
}
return uniqueInstance;
}
static void Connect()
{
try
{
Common.LogHandler.WriteLog("擷取RabbitMQ伺服器參數:" + defaultRabbitMQHostName + ":" + defaultRabbitMQPort + " (" + defaultRabbitMQUserName + "/" + defaultRabbitMQPassword + ")");
//連接配接工廠
mc_ConnectionFactory = new ConnectionFactory();
//連接配接工廠資訊
mc_ConnectionFactory.HostName = defaultRabbitMQHostName;// "localhost";
int rabbitmq_port = 5672;// 預設是5672端口
int.TryParse(defaultRabbitMQPort, out rabbitmq_port);
mc_ConnectionFactory.Port = rabbitmq_port;// "5672"
mc_ConnectionFactory.UserName = defaultRabbitMQUserName;// "guest";
mc_ConnectionFactory.Password = defaultRabbitMQPassword;// "guest";
mc_ConnectionFactory.VirtualHost = defaultRabbitVirtualHost;// "/"
mc_ConnectionFactory.RequestedHeartbeat = 30;//心跳包
mc_ConnectionFactory.AutomaticRecoveryEnabled = true;//自動重連
mc_ConnectionFactory.TopologyRecoveryEnabled = true;//拓撲重連
mc_ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
//建立連接配接
Connection = mc_ConnectionFactory.CreateConnection();
//斷開連接配接時,調用方法自動重連
Connection.ConnectionShutdown += Connection_ConnectionShutdown;
//建立發送頻道
SendChannel = Connection.CreateModel();
//建立接收頻道
ListenChannel = Connection.CreateModel();
//發送頻道确認模式,發送了消息後,可以收到回應
SendChannel.ConfirmSelect();
if(!string.IsNullOrEmpty(MC_SyncDataConsume))
{
//重新監控消息
RabbitmqMessageConsume(MC_SyncDataConsume);
}
Common.LogHandler.WriteLog("嘗試連接配接至RabbitMQ伺服器:" + defaultRabbitMQHostName);
}
catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e)
{
throw e;
}
catch (Exception ex)
{
throw ex;
}
}
static void Cleanup()
{
try
{
if (SendChannel != null && SendChannel.IsOpen)
{
try
{
SendChannel.Close();
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ重新連接配接,正在嘗試關閉之前的Channel[發送],但遇到錯誤", ex);
}
SendChannel = null;
}
if (ListenChannel != null && ListenChannel.IsOpen)
{
try
{
ListenChannel.Close();
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ重新連接配接,正在嘗試關閉之前的Channel[接收],但遇到錯誤", ex);
}
ListenChannel = null;
}
if (Connection != null && Connection.IsOpen)
{
try
{
Connection.Close();
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ重新連接配接,正在嘗試關閉之前的連接配接,但遇到錯誤", ex);
}
Connection = null;
}
}
catch (IOException ex)
{
throw ex;
}
}
private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
LogHandler.WriteLog("): RabbitMQ已經斷開連接配接,正在嘗試重新連接配接至RabbitMQ伺服器");
Reconnect();
}
private static void Reconnect()
{
try
{
//清除連接配接及頻道
Cleanup();
var mres = new ManualResetEventSlim(false); // state is initially false
while (!mres.Wait(3000)) // loop until state is true, checking every 3s
{
try
{
//連接配接
Connect();
mres.Set(); // state set to true - breaks out of loop
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ嘗試連接配接RabbitMQ伺服器出現錯誤:" + ex.Message, ex);
}
}
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ嘗試重新連接配接RabbitMQ伺服器出現錯誤:" + ex.Message, ex);
}
}
/*-------------------------------------------------------------------------------------*/
/// <summary>
/// Direct路由,發送消息至服務端
/// </summary>
/// <param name="exchangeName">交換機名稱</param>
/// <param name="routingKey">RoutingKey</param>
/// <param name="queueName">隊列名稱</param>
/// <param name="message">消息内容</param>
/// <returns></returns>
public bool DirectExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message)
{
try
{
if (Connection == null || !Connection.IsOpen) throw new Exception("連接配接為空或連接配接已經關閉");
if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道為空或通道已經關閉");
//建立一個持久化的隊列
bool queueDurable = true;
string QueueName = queueName;
string ExchangeName = exchangeName;
string RoutingKey = routingKey;
//聲明交換機
SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Direct);
//聲明隊列
SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
//路由綁定隊列
SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null);
//設定消息持久性
IBasicProperties props = SendChannel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;//持久性
//消息内容轉碼,并發送至伺服器
var messageBody = System.Text.Encoding.UTF8.GetBytes(message);
SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody);
//等待确認
return SendChannel.WaitForConfirms();
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex);
return false;
}
}
/// <summary>
/// Fanout路由,發送消息至服務端
/// </summary>
/// <param name="exchangeName">交換機名稱</param>
/// <param name="routingKey">RoutingKey</param>
/// <param name="queueName">隊列名稱</param>
/// <param name="message">消息内容</param>
/// <returns></returns>
public bool FanoutExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message)
{
try
{
if (Connection == null || !Connection.IsOpen) throw new Exception("連接配接為空或連接配接已經關閉");
if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道為空或通道已經關閉");
//建立一個持久化的頻道
bool queueDurable = true;
string QueueName = queueName;
string ExchangeName = exchangeName;
string RoutingKey = routingKey;
//聲明交換機
SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Fanout);
//聲明隊列
SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
//路由綁定隊列
SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null);
//設定消息持久性
IBasicProperties props = SendChannel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;//持久性
//消息内容轉碼,并發送至伺服器
var messageBody = System.Text.Encoding.UTF8.GetBytes(message);
SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody);
//等待确認
return SendChannel.WaitForConfirms();
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex);
return false;
}
}
/// <summary>
/// Topic路由,發送消息至服務端
/// </summary>
/// <param name="exchangeName">交換機名稱</param>
/// <param name="routingKey">RoutingKey</param>
/// <param name="queueName">隊列名稱</param>
/// <param name="message">消息内容</param>
/// <returns></returns>
public bool TopicExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message)
{
try
{
if (Connection == null || !Connection.IsOpen) throw new Exception("連接配接為空或連接配接已經關閉");
if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道為空或通道已經關閉");
//建立一個持久化的頻道
bool queueDurable = true;
string QueueName = queueName;
string ExchangeName = exchangeName;
string RoutingKey = routingKey;
//聲明交換機
SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic);
//聲明隊列
SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
//路由綁定隊列
SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null);
//設定消息持久性
IBasicProperties props = SendChannel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;//持久性
//消息内容轉碼,并發送至伺服器
var messageBody = System.Text.Encoding.UTF8.GetBytes(message);
SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody);
//等待确認
return SendChannel.WaitForConfirms();
}
catch (Exception ex)
{
LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex);
return false;
}
}
/// <summary>
/// Topic路由,接收同步消息
/// </summary>
/// <param name="queueName">監聽的隊列</param>
public void SyncDataFromServer(string queueName)
{
try
{
//設定參數,友善重新開機RabbitMQ伺服器時處理
MC_SyncDataConsume = queueName;
RabbitmqMessageConsume(MC_SyncDataConsume);
}
catch (Exception ex)
{
LogHandler.WriteLog("TopicExchangeConsumeMessageFromServer運作錯誤:" + ex.Message, ex);
throw ex;
}
}
private static void RabbitmqMessageConsume(string queueName)
{
try
{
if (Connection == null || !Connection.IsOpen) throw new Exception("連接配接為空或連接配接已經關閉");
if (ListenChannel == null || !ListenChannel.IsOpen) throw new Exception("通道為空或通道已經關閉");
bool queueDurable = true;
string QueueName = queueName;
//在MQ上定義一個持久化隊列,如果名稱相同不會重複建立
ListenChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
//輸入1,那如果接收一個消息,但是沒有應答,則用戶端不會收到下一個消息
ListenChannel.BasicQos(0, 1, false);
//建立基于該隊列的消費者,綁定事件
var consumer = new EventingBasicConsumer(ListenChannel);
//回應消息監控
consumer.Received += SyncData_Received;
//綁定消費者
ListenChannel.BasicConsume(QueueName, //隊列名
false, //false:手動應答;true:自動應答
consumer);
Common.LogHandler.WriteLog("開始監控RabbitMQ伺服器,隊列" + QueueName);
}
catch (Exception ex)
{
throw ex;
}
}
private static void SyncData_Received(object sender, BasicDeliverEventArgs e)
{
try
{
//TOOD 驗證程式退出後消費者是否退出去了
var body = e.Body; //消息主體
var message = Encoding.UTF8.GetString(body);
LogHandler.WriteLog("[x] 隊列接收到消息:" + message.ToString());
//處理資料
bool processSuccessFlag = new SyncDataHandler().ProcessSyncData(message);
if (processSuccessFlag)
{
//回複确認
ListenChannel.BasicAck(e.DeliveryTag, false);
}
else
{
//未正常處理的消息,重新放回隊列
ListenChannel.BasicReject(e.DeliveryTag, true);
}
}
catch (RabbitMQ.Client.Exceptions.OperationInterruptedException ex1)
{
Thread.Sleep(5000);
ListenChannel.BasicNack(e.DeliveryTag, false, true);
}
catch (Exception ex)
{
Thread.Sleep(5000);
ListenChannel.BasicNack(e.DeliveryTag, false, true);
}
}
}
}