天天看點

工業物聯網或系統內建中應用消息隊列(ActiveMQ,C#的demo)的場景全面分析第一章     終端/互動場景第二章     ActvieMQ應用場景第三章     假定場景分析

1. [連載]《C#通訊(序列槽和網絡)架構的設計與實作》 2.[ 開源]C#跨平台物聯網通訊架構ServerSuperIO(SSIO)介紹 2. 應用SuperIO(SIO)和開源跨平台物聯網架構ServerSuperIO(SSIO)建構系統的整體方案 3. C#工業物聯網和內建系統解決方案的技術路線(資料源、資料采集、資料上傳與接收、ActiveMQ、Mongodb、WebApi、手機App) 5.ServerSuperIO開源位址: https://github.com/wxzz/ServerSuperIO

目       錄

工業物聯網或系統內建中應用消息隊列(ActiveMQ)的場景全面分析... 1

前言... 1

第一章           終端/互動場景... 3

1.1           終端裝置... 3

1.2           通訊機制... 3

第二章           ActvieMQ應用場景... 4

2.1           釋出/訂閱(Publish/Subscribe)... 4

2.2           生産者/消費者(Producer/Consumer)... 7

2.3           請求/應答(Request/Response)... 10

第三章           假定場景分析... 16

3.1           通訊層... 16

3.2           資料業務層... 16

3.3           綜述... 16

前言

     網際網路技術已經發展的很成熟了,各種開源的代碼、架構和解決方案等。鑒于網際網路技術的通用性,勢必向其他領域延展。不管是工業4.0,還是網際網路+  工業,網際網路技術向工業領域傳導也是必然的。

     是以,對于工業方面的應用場景的技術儲備和技術線路調研也是日常工作很重要的一部分,為公司的橫向和縱向發展提供技術平台和保障,當然也取決于上司的視野。

第一章     終端/互動場景

    任何技術都是為業務服務,而業務是有特定的應用場景。離開了實作環境去談技術是沒有實際意義的,解決實際問題而又能保證相當長時間内的穩定性是我們努力實作的目标。同時要從多個角度來考慮問題,以及做出平衡。

1.1    終端裝置

(1)    終端種類:嵌入式硬體/傳感器、PC機(監測站、大型監控裝置等)、手機終端等。

(2)    互動方式:單向互動,資料上傳,可能服務端會有傳回确認資訊,證明資料已經收到了;雙向互動,服務端不僅僅會傳回确認資訊,同時還要主動下發給指定終端指令資訊,例如:控制硬體裝置機械動作指令、修改硬體裝置參數指令、以及補傳相關資料資訊指令等等。

(3)    裝置管理:這裡指的裝置管理是說裝置的狀态,包括兩個方面:裝置IO狀态和裝置通訊狀态。裝置IO狀态包括:IO打開和IO關閉。裝置通訊狀态包括:通訊中斷、通訊幹擾和通訊正常。為了判斷故障,這裡的邏輯關系是:IO打開的時候不一定代表通訊正常;IO關閉不一定代表通訊中斷;通訊中斷不一定代表IO關閉;通訊幹擾不一定代表IO打開。

(4)    資料完整性:允許資料缺失,一般在原來資料基礎上的增量資料是可以允許丢失的;不允許資料缺失,一般脈沖資料是不允許資料丢失的。

1.2    通訊機制

(1)主動請求資料:伺服器端主動下發指令給終端,讓誰上傳資料、上傳什麼資料都由伺服器端決定。

(2)被動接收資料:伺服器端被動接收終端上傳的資料,根據資料資訊進行資料處理,以及傳回确認資訊。

第二章     ActvieMQ應用場景

     消息隊列比較多,本文以ActiveMQ為例進行介紹,全部代碼實作C#為主,主要考慮到常見的應用模式。事例代碼下載下傳:

http://pan.baidu.com/s/1qXZ1sU4

2.1    釋出/訂閱(Publish/Subscribe)

     一個資訊釋出者在某一個主題上釋出消息,所有訂閱該主題的訂閱都會收到相同的消息,這種模式是一對多的關系,如下圖:

工業物聯網或系統內建中應用消息隊列(ActiveMQ,C#的demo)的場景全面分析第一章     終端/互動場景第二章     ActvieMQ應用場景第三章     假定場景分析

釋出端代碼:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Topic"));

                        string text = Console.ReadLine();
                        while (text!="exit")
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = text;
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }
                    }
                }
                Console.ReadLine();
            }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}", e.Message);
                Console.ReadLine();
            }
        }
      

 訂閱端代碼:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    connection.ClientId = "testing listener1";
                    connection.Start();

                    using (ISession session = connection.CreateSession())
                    {
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Topic"), "testing listener1", null, false);
                        consumer.Listener += new MessageListener(consumer_Listener);
                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
      

2.2    生産者/消費者(Producer/Consumer)

    生産者生産了一塊香皂,消費者購買了該塊香皂,使用完了,就在這個世界上消息了,生産者和消費者之間的關系存在一種偶然性,這是一對一的關系,如下圖:

工業物聯網或系統內建中應用消息隊列(ActiveMQ,C#的demo)的場景全面分析第一章     終端/互動場景第二章     ActvieMQ應用場景第三章     假定場景分析

生産端代碼:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("Queue"));

                        string text = Console.ReadLine();
                        while (text != "exit")
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = text;
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }
                    }
                }
                Console.ReadLine();
            }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}", e.Message);
                Console.ReadLine();
            }
        }
      

 消費端代碼:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    //connection.ClientId = "testing listener2";
                    connection.Start(); 
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("Queue"));
                        consumer.Listener += new MessageListener(consumer_Listener);
                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
      

2.3    請求/應答(Request/Response)

請求-應答的通信方式應用很普遍,用戶端向服務端上傳實時資料或參數,服務端處理完之後,要傳回确認資訊,這種互動關系如下圖:

工業物聯網或系統內建中應用消息隊列(ActiveMQ,C#的demo)的場景全面分析第一章     終端/互動場景第二章     ActvieMQ應用場景第三章     假定場景分析

用戶端代碼:

static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
            try
            {
                using (IConnection connection = factory.CreateConnection())
                {
                    connection.Start();
                    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        IDestination destination =  new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("client.messages");

                        IMessageProducer producer = session.CreateProducer(destination);
                        producer.DeliveryMode=MsgDeliveryMode.NonPersistent;

                        IDestination tempDest = session.CreateTemporaryQueue();
                        IMessageConsumer responseConsumer = session.CreateConsumer(tempDest);
                        responseConsumer.Listener += new MessageListener(consumer_Listener);
                       
                        string text = Console.ReadLine();
                        while (text != "exit")
                        {
                            ITextMessage msg = session.CreateTextMessage();
                            msg.Text = text;
                            msg.NMSReplyTo = tempDest;
                            msg.NMSCorrelationID = DateTime.Now.ToString("yyyyMMddHHmmss");
                            producer.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }

                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
      

 服務端代碼:

private static ISession session;

        private static IMessageProducer replyProducer;
        static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
            try
            {
                    IConnection connection = factory.CreateConnection();
                    connection.Start();
                    session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

                    IDestination adminQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("client.messages");
                    replyProducer = session.CreateProducer();
                    replyProducer.DeliveryMode=MsgDeliveryMode.NonPersistent;

                    IMessageConsumer consumer = session.CreateConsumer(adminQueue);
                    consumer.Listener += new MessageListener(consumer_Listener);
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage response = session.CreateTextMessage();
                if (message is ITextMessage) {
                    ITextMessage txtMsg = (ITextMessage)message;
                    string messageText = txtMsg.Text;
                    response.Text = messageText;

                    Console.WriteLine("Receive:" + messageText);
                }

                response.NMSCorrelationID=message.NMSCorrelationID;

                replyProducer.Send(message.NMSReplyTo, response);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);

            }
        }
      

第三章     假定場景分析

     我們以系統建設過程中的構架來分析消息隊列在兩個層面的問題,通訊層和資料業務層。

3.1    通訊層

     通訊層是否可以用消息隊列(ActiveMQ)?這個問題取決于兩方面:1、如果終端裝置有嵌入式硬體,甚至還是C51開發的,那麼在系統內建和物聯的過程中,就涉及到相容性的問題。顯然和消息隊列進行對接是一件頭痛的事,用C51寫一個對接的驅動不是不可能,但是要評估工作量和穩定性。2、服務端與指定某個終端雙向互動頻繁的情況,特别是服務端實時發送裝置校準指令的情況,這種情況消息隊列是不如通訊架構的。

3.2    資料業務層

     服務端接收到資料後,完全可以使用消息隊列的生産者和消費者模式處理資料,對系統的業務進行解耦。

     下發指令也可以通過消息隊列,這樣可以統一控制端的接口,再由通訊架構下發到指定的終端。

3.3    綜述

     綜合考慮,建議在通訊層使用通訊架構,對于裝置的IO狀态和通訊狀态能夠及時反應,通訊效率也是能夠得到保障的;對于資料業務層,建議不要放在通訊架構内部進行處理,可以使用消息隊列,配合通訊架構使用。

    整體架構圖如下:

工業物聯網或系統內建中應用消息隊列(ActiveMQ,C#的demo)的場景全面分析第一章     終端/互動場景第二章     ActvieMQ應用場景第三章     假定場景分析

文章得到了群友支援:

工業物聯網或系統內建中應用消息隊列(ActiveMQ,C#的demo)的場景全面分析第一章     終端/互動場景第二章     ActvieMQ應用場景第三章     假定場景分析