天天看點

分布式消息通信隊列——rabbitmq

對于傳統的Linux服務中,各子產品間通信我們可以通過程序間通信、線程間通信的機制實作。但對于分布式系統中,其架構中含有多個元件、多個子系統,而且各元件/子系統可能是分布在不同節點上,是以要如何實作它們之間的通信呢?

第一種方式:IPC

IPC通信的缺點是其各子產品耦合性太大,不适合擴充

第二種方式:Socket

socket是一種常用的通信方式,是開發中常用的一種手段,但它需要維持連結不中斷,而且發送者/接收者耦合度大,并且需要考慮接收者優先級等問題。

是以分布式消息通信機制在分布式系統中是一種常用的方式,其協定AMQP,對于分布式異步通信是一種很好的方式。本文以rabbtmq為例。rabbitmq也是作為OpenStack雲平台中,各元件中間通信的中間件,實作OpenStack各子產品間的異步通信。

推薦一個rabbitmq模拟學習的網址:tryrabbitmq

1. rabbitmq簡介

分布式消息通信隊列——rabbitmq

1. RabbitMQ Server的角色就是維護一條從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。

2. Client A & B: 也叫Producer,資料的發送方。一個Message有兩個部分:payload(有效載荷)和label(标簽)。payload顧名思義就是傳輸的資料。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

3. Client 1,2,3:也叫Consumer,資料的接收方。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,隻有payload,label已經被删掉了。對于Consumer來說,它是不知道誰發送的這個資訊的。

2. rabbitmq的通信方式:

Producter和consumer消息傳遞方式:exchanges, queues and bindings。

Exchanges : are where producers publish their messages.

Queues :are where the messages end up and are received by consumers

Bindings :are how the messages get routed from the exchange to particular queues.

Producer和Consumer都是通過TCP連接配接到RabbitMQ Server的。

Connection: 就是一個TCP的連接配接。Producer和Consumer都是通過TCP連接配接到RabbitMQ Server的。以後我們可以看到,程式的起始處就是建立這個TCP連接配接。

Channels: 虛拟連接配接。它建立在上述的TCP連接配接中。資料流動都是在Channel中進行的。也就是說,一般情況是程式起始建立TCP連接配接,第二步就是建立這個Channel。

如果message有訂閱者(consumer subscribe),當message到queue,會立即被發送給consumer,當consumer正确接收後,queue删掉message;

如果message沒有訂閱者,則message到達queue後,會被轉到cache,而不會被丢棄。

正确收到:consumer傳回ack

Consumer reject a message後,rabbitMQ server兩種處理方法:

1、 Reject後rabbitMQ server發給下一個consumer

2、 rabbitMQ server删掉message

prudocer : 消息的發送者

queue(broker): 在應用之間傳輸的message實際上在queue中。其本質上是一塊大記憶體。是producer把message發給queue,然後consumer從queue中取message

consumer: 等待message的應用

producer、consumer和broker不必一定在同一host上。

如果producer把message發給一個不存在的queue,rabbitMQ隻會丢掉消息(drop message)

routing_key參數:queue的名字,指定message經過exchange後應該發給哪個queue。

如果有多個consumer/worker時,producer發出來的message将會被consumer輪詢消費。

Q: 如果consumer/worker對message處理要有一定的時間,在這段時間内,message還沒處理完,但consumer/worker挂掉了或者被幹掉了,會發生什麼?

A:

1. 沒有ack的情況下:consumer不傳回ack

在目前的rabbitMQ代碼版本(version 0.11.0)中,當message被發送給consumer後,馬上被标記為deletion。當worker被幹掉後,這個正在處理的message會丢失,同時所有發給這個worker的、還沒出來的的message都會丢掉。

2. 有ack情況下:consumer如果正常處理message,則傳回ack

如果沒有收到ack,則表示consumer沒完全正确處理message,rabbitMQ會重新re-queue改消息,再将其發給其他consumer。(timeout?)

聲明exchange、queue等時,使用channel.exchange_declare()或channel.queue_declare()方法,參數解析:

1. durable:持久的,即當rabbitMQ重新開機之後,是否會丢失,當exchange或queue聲明時加入durable=true,則rabbitMQ重新開機後,exchange和queue仍存在。Eg:channel.exchange_declare(exchange='web', durable=True)

2. exclusive:當exclusive=true時,consumer連接配接關閉時,queue可檢測到。Eg:result = channel.queue_declare(exclusive=True),沒有指定queue名字,系統将自動随機生成一個名字。

exchange:一邊從producer接收message,然後另一邊将message發給queue。Exchange需要知道應該對message做什麼處理:是發給某個queue,或是發給多個queue,亦或是丢棄message,exchange type定義了這些規則。

Exchange type的類型有:direct、topic、headers和fanout

• fanout: 廣播,所有訂閱到這個exchange的queue都會接收到message。

• direct: 可以根據routing_key定向選擇message。在binding時,通過加入routing_key參數:channel.queue_bind(routing_key=XXX),然後釋出message時,在channal.basic_publish(routing_key=XXX)來實作兩者對應。

• topic: 也是通過比對routing_key來選擇queue。Topic的routing_key包含一系列關鍵詞,每個關鍵詞用“.”分開,有兩個特性:*代表一個詞,#代表零個或多個詞。如word1.word2.word3

如果隻有#,則topic變成fanout。

如果沒有*和#,則topic就是direct。

ref:

消息隊列在分布式系統中的應用 https://blog.csdn.net/Jmilk/article/details/78642316

快速入門分布式消息隊列之 RabbitMQ(1) https://blog.csdn.net/jmilk/article/details/78705281

快速入門分布式消息隊列之 RabbitMQ(2) https://blog.csdn.net/Jmilk/article/details/78712632

快速入門分布式消息隊列之 RabbitMQ(3) https://blog.csdn.net/jmilk/article/details/78767168 

繼續閱讀