天天看點

mqtt session保持 訂閱消息_MQTT系列入門介紹

寫在前面:        

最近參與了公司的物聯網項目開發,是以花了幾天時間研究了一下MQTT協定相關的内容,也查閱了相關的資料和開源中間件的源碼。在學習過程中,我發現目前網上這部分内容的資料不多并且内容較為零散。是以就有了寫這個系列的想法,一方面想系統的梳理下MQTT的相關知識和設計思想,友善日後檢視,另一方面如果有正在開發相關業務的小夥伴,也可以作為參考,不至于一頭霧水。目前這個系列會涵蓋

  1. MQTT協定内容的基本概述以及入門使用
  2. Spring-intergration整合Eclipse.paho作為MQTTClient實作消費訂閱消息功能
  3. Eclipse.paho内部消息流轉的執行流程及源碼解析
  4. 使用過程中遇到的問題彙總

這個是我寫的第一篇公衆号文章,關于内容有很多也是我自己通過閱讀官方文檔和檢視源代碼得到的了解。肯定會有存在有偏差的問題出現,歡迎小夥伴們随時留言指正。下面我們進入正題

mqtt session保持 訂閱消息_MQTT系列入門介紹
mqtt session保持 訂閱消息_MQTT系列入門介紹

一. 簡述

目前市面上流行的消息協定很多,例如基于Java平台的JMS協定,基于應用層标準的AMQP進階消息隊列協定,我們熟悉的RabbitMQ消息服務中間件就是基于AMQP協定的ERlang實作(之後我們搭建的MQTT消息服務也會基于RabbitMQ來實作,這個後面詳細說)。那麼MQTT協定到底是什麼呢?下面我們來看一下。 MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協定)是一種基于釋出/訂閱(publish/subscribe)模式的"輕量級"通訊協定,該協定建構于TCP/IP協定上,由IBM在1999年釋出。MQTT最大優點在于,可以以極少的代碼和有限的帶寬,為連接配接遠端裝置提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協定,使其在物聯網、小型裝置、移動應用等方面有較廣泛的應用。

mqtt session保持 訂閱消息_MQTT系列入門介紹

MQTT消息模型

通過上面的圖示,我們可以很清晰的了解到MQTT采用的也是釋出/訂閱這種方式。與AMQP協定類似,它本身支援消息通過生産端發起,推送到消息服務,消費端通過訂閱的方式統一消費的模式。消費端可以根據自己的需要通過訂閱不同的主題(topic)來消費對應的消息。這種類似“推模式”的方式既可以保證消息的及時性,也可以減小消費端和Broker之間的通信壓力。

二、協定原理

2.1 協定的實作方式

實作MQTT協定需要用戶端和伺服器端通訊完成。在通訊過程中,MQTT協定中有三種身份

  • 釋出者(Publish)
  • 代理(Broker)(伺服器)
  • 訂閱者(Subscribe)

其中,消息的釋出者和訂閱者都是用戶端,消息代理是伺服器,消息釋出者可以同時是訂閱者。

MQTT傳輸的消息分為兩個部分

  • 主題(Topic)
  • 負載(payload)

2.2 MQTT用戶端

一個使用MQTT協定的應用程式或者裝置,它總是建立到伺服器的網絡連接配接。用戶端可以實作下面的功能

  1. 釋出其他用戶端可能會訂閱的資訊
  2. 訂閱其它用戶端釋出的消息
  3. 退訂或删除應用程式的消息
  4. 斷開與伺服器連接配接

2.3 MQTT伺服器

MQTT伺服器也稱為"消息代理"(Broker),可以是一個應用程式或一台裝置。它是位于消息釋出者和訂閱者之間,可以實作下面的功能

  1. 接受來自客戶的網絡連接配接
  2. 接受客戶釋出的應用資訊
  3. 處理來自用戶端的訂閱和退訂請求
  4. 向訂閱的客戶轉發應用程式消息

MQTT在通信時會建構底層網絡傳輸,它将建立用戶端到伺服器的連接配接,提供兩者之間的一個有序的、無損的、基于位元組流的雙向傳輸。當應用資料通過MQTT網絡發送時,MQTT會把與之相關的服務品質(QOS)和主題名(Topic)相關聯。

2.4 MQTT協定中的訂閱、主題、會話

  1. 訂閱(Subscription)

訂閱包含主題篩選器(Topic Filter)和最大服務品質(QOS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每個會話中的每個訂閱都有一個不同的主題篩選器

    2. 會話(Session)

每個用戶端與伺服器建立連接配接後就是一個會話,用戶端和伺服器之間有狀态互動。會話存在于一個網絡之間,也可能在用戶端和伺服器之間跨越多個連續的網絡連接配接。

    3. 主題名(Topic Name)

連接配接到一個應用程式消息的标簽,該标簽與伺服器的訂閱相比對。伺服器會将消息發送給訂閱所比對标簽的每個用戶端。

    4. 主題篩選器(Topic Filter)

一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所比對到的多個主題。

    5. 負載

消息訂閱者所具體接收的内容。

2.5 MQTT協定中的方法

MQTT協定中定義了一些方法(也被稱為動作), 用于表示對确定資源所進行操作。這個資源可以代表預先存在的資料或動态生成資料,這取決于伺服器的實作。通常來說,資源指伺服器上的檔案或輸出。主要方法有:

  • Connect:等待與伺服器建立連接配接。
  • Disconnect:等待MQTT用戶端完成所做的工作,并與伺服器斷開TCP/IP會話。
  • Subscribe:等待完成訂閱。
  • UnSubscribe:等待伺服器取消用戶端的一個或多個topics訂閱。
  • Publish:MQTT用戶端發送消息請求,發送完成後傳回應用程式線程。

三、MQTT協定資料包結構

在MQTT協定中,一個MQTT資料包由以下三部分内容組成

  • 固定頭(Fixed header):存在于所有MQTT資料包中,表示資料包類型及資料包的分組類辨別。
  • 可變頭(Variable header):存在于部分MQTT資料包中,資料包類型決定了可變頭是否存在及其具體内容。
  • 消息體(Payload):存在于部分MQTT資料包中,表示用戶端收到的具體内容。

3. 1 MQTT固定頭

每個MQTT控制封包都包含一個固定報頭,固定頭部占用兩個位元組。其記憶體結構大概如下圖所示

mqtt session保持 訂閱消息_MQTT系列入門介紹

固定頭記憶體結構

3.1.1 控制封包類型

用來描述封包的類型,通常用于用戶端和服務端進行雙向通信時對封包進行分類,不同的封包類型處理的方式也不同。

mqtt session保持 訂閱消息_MQTT系列入門介紹

3.1.2 重發表示 DUP FLAG:

發送消息的副本。如果DUP标志被設定為0,表示這是用戶端或者服務端第一次請求發送這個PUBLISH封包。如果DUP标志被設定為1,表示這可能是一個早期封包請求的重發。但不能用于檢測消息重複發送

3.1.3 消息品質QOS(Quality of Service)

          發送者和接收者之間,對于消息傳遞的可靠程度的協商。目前有三種消息釋出的服務品質

  • QOS= 0:"至多一次",對于client而言,有且僅發一次publish包,對于broker而言,有且僅發一次publish。簡而言之,就是僅發一次包,是否收到完全不管,适合那些不是很重要的資料。
mqtt session保持 訂閱消息_MQTT系列入門介紹
  • QOS= 1:確定消息到達,但消息重複可能會發生。當broker收到client的publisher或者subscriber收到broker的publisher時,都會産生一條puback用來确認到達。當client沒收到broker的puback或者broker沒有收到client的puback,name就會一直發送publisher
mqtt session保持 訂閱消息_MQTT系列入門介紹
  • QOS= 2:確定消息到達一次。publisher和broker分别進行了緩存,其中publisher緩存了message和messageId,而broker緩存了messageId,兩方都做了記錄。是以可以保證消息不重複,但同時因為增加了兩次通信互動,對網絡帶寬和伺服器負載也會産生一些壓力
mqtt session保持 訂閱消息_MQTT系列入門介紹

注意:對于QOS2的情況,增加了一個PUBREL及PUBCOM的過程,同時broker端做了如下的特殊處理。

如果Publisher沒收到Broker的PUBRECV,Publisher會重發,但是對于之前一條message,Broker有兩種處理方式:

  1. message存在本地,先不給publish和subscriber
  2. 儲存messageId,把message publish給他的subscriber

對于第一種處理方式, 如果Publisher繼續重發,且被收到(ID相同),那在Broker端隻算一條message,繼續等Publisher發PUBREL 。 這樣broker就保證隻publish了一條message,而不是多條。 對于第二種處理方式, 如果sender繼續重發,且被收到,sever會檢查它的message ID,如果重發過來的message ID是之前存過的,broker就不會publish給他的subscriber,因為之前已經publish了,直接删除。 不同的消息服務針對QOS2的情況會選擇不同的處理方式。針對RabbitMQ而言,本身不支援QOS2的處理方式,會自動降級為QOS1進行處理。是以在這裡針對這個問題我們不展開讨論。

3.1.4 retain(保留消息)

        釋出保留辨別,表示伺服器要保留這次推送的資訊,如果有新的訂閱者出現,就把這消息推送給它,如果不設那麼推送至目前訂閱的就釋放了。這裡需要和消息積壓處理區分開,通常如果publisher生産一條消息到broker,如果訂閱的消費者此時與broker斷開連接配接,在cleanSession設定為true的情況下(這個屬性後面會詳細講),消費者重新連接配接後是無法收到這些在自己斷開時被生産的消息的。這與我們經常使用的AMQP有很大的不同。而保留消息的意思是生産端如果生産一條保留消息,這條消息會一直緩存在broker端,每次消費端斷開重連,都會收到這條消息。但是保留消息在broker有且僅有一條。也就是說,後發送的保留消息會覆寫掉先發送的保留消息。消費端連接配接成功後收到的保留消息,永遠是最後一條發送給broker的保留消息。

3.2 可變報頭

MQTT資料包中包含一個可變頭,它駐位于固定的頭和負載之間。可變報頭的結構會根據資料包類型的不同而不同。内容也會有相應的變化。這裡要注意下,網上大部分資料通常會把CONN類型或者Publish類型的消息可變頭作為通用的可變頭結構了解。其實不存在通用可變頭結構的概念,不同類型資料包,攜帶的資訊都是不一樣的。這裡因為篇幅關系,我就簡單介紹一下可變頭中比較重要的資訊屬性。

3.2.1 Clean Session

這裡就是剛才在介紹保留消息時提到的屬性。包含在CONN類型可變頭中,表示 如果訂閱的客戶機斷線了,那麼要儲存其要推送的消息,如果其重新連接配接時,則将這些消息推送。 1表示消除,表示客戶機是第一次連接配接,消息是以以前的連接配接資訊。

3.2.2 遺囑消息

當client發送一條CONN類型的封包給伺服器時,可以在可變報頭中包含遺囑消息。當client與broker斷開時,broker會主動向消費訂閱端推送這條遺囑消息。遺囑消息的發送條件包括

  • 服務端檢測到了一個I/O錯誤或者網絡故障。
  • 用戶端在保持連接配接(Keep Alive)的時間内未能通訊。
  • 用戶端沒有先發送DISCONNECT封包直接關閉了網絡連接配接。
  • 由于協定錯誤服務端關閉了網絡連接配接。

遺囑消息本質也是一條保留消息,也有自己的消息品質(QOS)。也就是說,如果遺囑消息被發送時,會作為最新的一條保留消息替換掉之前的保留消息。遺囑消息和保留消息配合使用可以作為裝置端通知控制端線上或者離線的通信手段,網上有很多解決方案,這裡不再贅述。

3.2.3 主題名(Topic Name)

用于識别有效載荷資料應該被釋出到哪一個資訊通道。存在于PUBLISH(QOS>0時), PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCIBE,UNSUBACK類型的封包中。 這裡順便說明一下Topic的過濾器,也就是所謂的主題過濾器的比對規則。 我們先來簡單回顧下,使用過RabbitMQ消息服務的小夥伴應該都了解,在AMQP協定下, 當使用 publisher/subscriber模式時,可以使用topic模式模糊監聽消息隊列。例如,producter發送一條“hello world”的消息到指定路由,routingKey為mqtt.abc.def。此時有兩個consumer同時在消費,consumerA監聽隊列queue1,bindingkey為mqtt.#,consumerB監聽隊列queue2,bindingKey為mqtt.*,此時收到消息的應該是comsumerA,因為consumerA的bindingKey格式為mqtt.#,即以mqtt開頭的routingkey無論有幾個分段辨別符(即多層分隔符),都會被監聽到。而consumerB隻能監聽到單層分隔符。這是一些最基本的知識,但是我為什麼要先說這個呢?原因就在于mqtt的主題過濾器與amqp協定下的主題過濾器非常類似,差別就在于把 "." 更換成了 “#”。 舉個栗子 : 如果用戶端訂閱主題 “sport/tennis/player1/#”,它會收到使用下列主題名釋出的消息:

sport/tennis/player1sport/tennis/player1/rankingsport/tennis/player1/score/wimbledon
           

值得注意的是:“sport/#”也比對單獨的 “sport” ,因為 # 包括它的父級。并且#隻能放在訂閱主題名稱的/後,并且在末尾的位置,否則比對無效。例如

sport/tennis#sport/tennis/#/ranking
           

上面兩種監聽方式都是無效的 !!

對應AMQP的單層通配符 “*”,MQTT協定使用的是 “+”,并且+隻能用于單層通配符。在主題過濾器的任意層級都可以使用單層通配符,包括第一個和最後一個層級。然而它必須占據過濾器的整個層級。可以在主題過濾器中的多個層級中使用它,也可以和多層通配符一起使用。

再舉個栗子

  • “+” 是有效的。
  • “+/tennis/#” 是有效的。
  • “sport+” 是無效的。
  • “sport/+/player1” 也是有效的。
  • “/finance” 比對 “+/+” 和 “/+” ,但是不比對 “+”。

另外:服務端不能将 $ 字元開頭的主題名比對通配符 (#或+) 開頭的主題過濾器

  • 訂閱 “#” 的用戶端不會收到任何釋出到以 “$” 開頭主題的消息。
  • 訂閱 “+/monitor/Clients” 的用戶端不會收到任何釋出到 “$SYS/monitor/Clients” 的消息。
  • 訂閱 “$SYS/#” 的用戶端會收到釋出到以 “$SYS/” 開頭主題的消息。
  • 訂閱 “$SYS/monitor/+” 的用戶端會收到釋出到 “$SYS/monitor/Clients” 主題的消息。
  • 如果用戶端想同時接受以 “$SYS/” 開頭主題的消息和不以 $ 開頭主題的消息,它需要同時訂閱 “#” 和 ““$SYS/#”。

除此之外:主題名和主題過濾器還必須符合下列規則

  • 所有的主題名和主題過濾器必須至少包含一個字元 [MQTT-4.7.3-1]。
  • 主題名和主題過濾器是區分大小寫的。
  • 主題名和主題過濾器可以包含空格。
  • 主題名或主題過濾器以前置或後置斜杠 “/” 區分。
  • 隻包含斜杠 “/” 的主題名或主題過濾器是合法的。
  • 主題名和主題過濾器不能包含空字元 
  • 主題名和主題過濾器是UTF-8編碼字元串,它們不能超過65535位元組 

3.2.4 封包辨別符 Packet Identifier

封包辨別符用來區分封包,特别是在重發的封包中用來辨別是否是同一個封包,并在需要應答的場景中用于确定是對哪個發送封包的應答。可變報頭的封包辨別符(Packet Identifier)字段存在于在多個類型的封包裡。 用戶端每次發送一個新的這些類型的封包時都必須配置設定一個目前未使用的封包辨別符 。如果一個用戶端要重發這個特殊的控制封包,在随後重發那個封包時,它必須使用相同的辨別符。 當用戶端處理完這個封包對應的确認後,這個封包辨別符就釋放可重用 。QOS1的PUBLISH對應的是PUBACK,QOS2的PUBLISH對應的是PUBCOMP。 QOS設定為0的PUBLISH封包不能包含封包辨別符。 值得注意的是,假設用戶端發送辨別符為0x1234的Publish封包,它有可能會在收到那個封包的PUBACK之前,先收到服務端發送的另一個消息不同但是封包辨別符相同的Publish封包。但隻要保證在一條消息中Publish和PubACK有唯一的映射關系就可以

3.3 有效負載(payload)

Payload消息體是MQTT資料包的第三部分。包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四種類型的消息。也就是說,這四種控制封包的消息體是必須要有的

  • CONNECT:消息體内容主要是:用戶端的ClientID、訂閱的Topic、Message以及使用者名和密碼。
  • SUBSCRIBE:消息體内容是一系列的要訂閱的主題以及QOS。
  • SUBACK:消息體内容是伺服器對于SUBSCRIBE所申請的主題及QOS進行确認和回複。
  • UNSUBSCRIBE:消息體内容是要訂閱的主題。

而publish是消息體中則儲存推送的消息,以二進制形式,當然這裡的編輯可自定義。

 以下是各個控制封包對消息體的規則

mqtt session保持 訂閱消息_MQTT系列入門介紹

以上就是MQTT協定中的較為常用和核心的内容,當然還有一些其他邊邊角角的東西。例如各個控制封包之間資料結構的異同,連接配接方式,安全認證等。這些内容都可以在官方文檔中找到,感興趣的小夥伴可以直接參考官方文檔

傳送門:http://mqtt.org/

結束語

本章節主要介紹了MQTT協定的基本構成,通信方式,控制封包格式以及資料包的組成部分。下一章會從代碼的角度基于Spring-intergration和Eclipse.Paho中間件和RabbitMQ消息服務搭建一套完整的消息發送接收功能。敬請期待~~~