天天看點

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

(使用java用戶端)

一、概述

在前面的專題學習中,我們建立了一個工作隊列,在工作隊列中假如每個任務交給一個确定的工作者,不管是生産者還是消費者都必須知道一個指定的隊列名稱才能發送和接收消息,而rabbitmq消息模型的核心思想就是生産者不會将消息直接發送給隊列。

因為生産者通常不會知道消息将會被哪些消費者接收,生産者的消息雖然不是直接發送給queue(隊列),但是消息會交給exchange(交換機),是以需要定義exchange的消息分發模式來實作消息的分發,這便是這部分專題學習中我們将要學習的釋出者/訂閱者模式,這樣實作了消息生産者和消息消費者之間的解耦。

在前面的專題學習中實作簡單消息傳遞和工作隊列中有如下一行代碼:

在上述代碼中第一個是空字元串其實就是exchangename,這裡用了空字元串,就表示消息會交給預設的exchange。

為了說明這種消息分發模型,我們将建構一個簡單的日志記錄系統,它包括兩個程式--第一個程式用來發送日志消息,第二個程式用來接收列印這些日志消息。

在日志記錄系統運作的每個接收者都将接收到消息,這樣我們可以運作一個接收者将消息輸出到控制台。

總的原則:發送的日志消息将被廣播到所有的接收者。

二、日志消息系統的實作

2.1、exchange(交換機)

之前發送和接收消息都是通過一個隊列來實作,現在是時候介紹下一個完整的rabbitmq的消息傳遞模型了。

首先來對之前學習的消息傳遞加深一下映象

>一個生産者是一個用于發送消息的應用

>一個隊列是存儲消息的緩沖區

>一個消費者是一個接收消息的應用。

在前面已經提到了rabbitmq的核心思想是:生産者從來不需要直接發送任何消息到隊列中,實際上通常生産者甚至不知道消息江北發送到任何一個隊列中。

相反,生産者隻能發送消息到一個交換元件中(exchange),exchange是一個很簡單的東西,一方面它接收來自生産者的消息,另外一方面它将把來自生産者的消息放入到隊列中,exchange必須知道怎麼接收一個消息,而且接收的消息應該被添加到一個指定的隊列?還是多個隊列中,或者接收的消息被丢棄,這個規則被exchange所定義,它的結構如下:

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-1

exchange有如下幾種定義類型:direct、topic、headers、fanout,每種類型都自己的實作方式和消息分發機制,在此我們将重點放在最後一種類型:fanout,首先建立一個這種類型的交換。

基于fanout的exchange是非常簡單的,正如它的名字一樣,我們能猜到它的具體實作,它不僅僅廣播各種來着生産者的消息到它所知道的所有隊列中,這正是日志記錄系統的所需要的。

2.2、交換清單(exchange list)

為了列出伺服器中所有的exchanges(交換機),我們通過運作rabbitmqctl來實作,在列出的清單中有一些amp.*changes和沒有定義名稱的exchange(預設),這些是被伺服器預設建立的,但是這些當我們需要使用的時候是不可用的。

在之前不知道關于exchange的任何東西,但是它仍然能夠發送消息到隊列,這可能因為是使用了預設的exchange,因為我們定義一個空的串("")。

之前釋出的消息:

第一個參數就是exchange的名稱,空的字元串表示預設或者是無名的exchange,消息被路由到指定的routingkey名稱的隊名,加入它存在的話。

2.3、臨時隊列(temporary queues)

在之前我們使用的隊列都是被定義過特殊的名稱(hello和task_queue),對于rabbitmq來說命名一個隊列是至關重要的,當你想在生産者和消費者中分享隊列的時候,給一個隊列的名稱是必須的。

但是那些都不是日志記錄系統所需要的,我們希望能夠獲得所有的日志資訊,而不隻是其中的一部分,而且我們隻對目前正在傳遞的資訊感興趣,對舊的日志資訊不感興趣,要解決這些問題,我們需要分兩個步驟:

首先當我們連結到rabbitmq伺服器的時候,需要一個新的、空的隊列,為了做到這點,可以建立一個随機名的隊列,或者更好的方法就是讓伺服器選擇一個随機的隊列名。

其次,當斷開與隊列的連接配接時,消費者應該被自動删除掉。

在java用戶端,我們通過一個無參數的queuedeclare()方法為我們建立一個非持久的、唯一的、能自動删除的隊列與隊列名稱

在這一點上queuename包含一個随機隊列名稱,比如它可能看起來像amq.gen-jzty20brgko-hjmujj0wlg.的随機串。

2.4、綁定(bindings)

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-2

我們已經建立了一個fanout exchange和一個隊列,現在我們需要告訴exchange去發送消息到隊列中,exchange和隊列之間的關系被稱為一個綁定(binding)。

從現在開始我們從logs exchange将被添加消息到隊列中,使用rabbitmqctl list_bingdins能列出所有的綁定。

2.5、釋出者/訂閱者實作(putting it all together)

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-3

生産者代碼和之前的發送消息的代碼并沒有太大的差別,最重要的變化是,我們現在要将釋出的消息傳遞給logs exchange來代替無名的exchange(之前的是""),在發送消息時需要提供一個routingkey,它對于fanout exchange是非常重要的,不能被忽視的,這裡的emitlog.java代碼如下:

發送

接收

2.6、測試釋出者/訂閱者

操作步驟:

1、運作多個receivelogs,分别記為01、02、03、04,首先執行前三個接收者,如下圖所示

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-4

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-5

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-6

2、運作emitlog.java,此時可以看到上述三個接收者都能接收消息

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-7

3、執行receivelogs04,此時它沒有收到消息。

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-8

4、再次執行emitlog.java,此時可以看到所有的接收者都接收到了消息。

RabbitMQ (消息隊列)專題學習04 Publish/Subscribe(釋出者/訂閱者)

圖-9

說明exchange在接收到生産者的消息後,會将消息發送到目前已經與它綁定了的所有的queue中,在接收者完消息之後,rabbitmq将隊列中的消息移除。

源碼下載下傳