在上篇文章中,我們解決了從發送端(Producer)向接收端(Consumer)發送“Hello World”的問題。在實際的應用場景中,這是遠遠不夠的。從本篇文章開始,我們将結合更加實際的應用場景來講解更多的進階用法。
當有Consumer需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance每個Consumer的load。試想一下,對于web application來說,在一個很多的HTTP request裡是沒有時間來處理複雜的運算的,隻能通過背景的一些工作線程來完成。接下來我們分布講解。
應用場景就是RabbitMQ Server會将queue的Message分發給不同的Consumer以處理計算密集型的任務:
1. 準備
在上一篇文章中,我們簡單在Message中包含了一個字元串"Hello World"。現在為了是Consumer做的是計算密集型的工作,那就不能簡單的字元串了。在現實應用中,Consumer有可能做的是一個圖檔的resize,或者是pdf檔案的渲染或者内容提取。但是作為Demo,還是用字元串模拟吧:通過字元串中的.的數量來決定計算的複雜度,每個.都會消耗1s,即sleep(1)。
還是複用上篇文章中的code,根據“計算密集型”做一下簡單的修改,為了辨識,我們把send.py 的名字換成new_task.py
[python] view plaincopy
- import sys
- message = ' '.join(sys.argv[1:]) or "Hello World!"
- channel.basic_publish(exchange='',
- routing_key='hello',
- body=message)
- print " [x] Sent %r" % (message,)
同樣的道理,把receive.py的名字換成worker.py,并且根據Message中的.的數量進行計算密集型模拟:
[python] view plaincopy
- import time
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
2. Round-robin dispatching 循環分發
RabbitMQ的分發機制非常适合擴充,而且它是專門為并發程式設計的。如果現在load加重,那麼隻需要建立更多的Consumer來進行任務處理即可。當然了,對于負載還要加大怎麼辦?我沒有遇到過這種情況,那就可以建立多個virtual Host,細化不同的通信類别了。
首先開啟兩個Consumer,即運作兩個worker.py。
Console1:
[python] view plaincopy
- shell1$ python worker.py
- [*] Waiting for messages. To exit press CTRL+C
Consule2:
[python] view plaincopy
- shell2$ python worker.py
- [*] Waiting for messages. To exit press CTRL+C
Producer new_task.py要Publish Message了:
[python] view plaincopy
- shell3$ python new_task.py First message.
- shell3$ python new_task.py Second message..
- shell3$ python new_task.py Third message...
- shell3$ python new_task.py Fourth message....
- shell3$ python new_task.py Fifth message.....
注意一下:.代表的sleep(1)。接着開一下Consumer worker.py收到了什麼:
Console1:
[python] view plaincopy
- shell1$ python worker.py
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'First message.'
- [x] Received 'Third message...'
- [x] Received 'Fifth message.....'
Console2:
[python] view plaincopy
- shell2$ python worker.py
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'Second message..'
- [x] Received 'Fourth message....'
預設情況下,RabbitMQ 會順序的分發每個Message。當每個收到ack後,會将該Message删除,然後将下一個Message分發到下一個Consumer。這種分發方式叫做round-robin。這種分發還有問題,接着向下讀吧。
3. Message acknowledgment 消息确認
每個Consumer可能需要一段時間才能處理完收到的資料。如果在這個過程中,Consumer出錯了,異常退出了,而資料還沒有處理完成,那麼非常不幸,這段資料就丢失了。因為我們采用no-ack的方式進行确認,也就是說,每次Consumer接到資料後,而不管是否處理完成,RabbitMQ Server會立即把這個Message标記為完成,然後從queue中删除了。
如果一個Consumer異常退出了,它處理的資料能夠被另外的Consumer處理,這樣資料在這種情況下就不會丢失了(注意是這種情況下)。
為了保證資料不被丢失,RabbitMQ支援消息确認機制,即acknowledgments。為了保證資料能被正确處理而不僅僅是被Consumer收到,那麼我們不能采用no-ack。而應該是在處理完資料後發送ack。
在處理資料後發送的ack,就是告訴RabbitMQ資料已經被接收,處理完成,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下資料也不會丢失。
這裡并沒有用到逾時機制。RabbitMQ僅僅通過Consumer的連接配接中斷來确認該Message并沒有被正确處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做資料處理。
預設情況下,消息确認是打開的(enabled)。在上篇文章中我們通過no_ack = True 關閉了ack。重新修改一下callback,以在消息處理完成後發送ack:
[python] view plaincopy
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_consume(callback,
- queue='hello')
這樣即使你通過Ctr-C中斷了worker.py,那麼Message也不會丢失了,它會被分發到下一個Consumer。
如果忘記了ack,那麼後果很嚴重。當Consumer退出時,Message會重新分發。然後RabbitMQ會占用越來越多的記憶體,由于RabbitMQ會長時間運作,是以這個“記憶體洩漏”是緻命的。去調試這種錯誤,可以通過一下指令列印un-acked Messages:
[python] view plaincopy
- $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
- Listing queues ...
- hello 0 0
- ...done.
4. Message durability消息持久化
在上一節中我們知道了即使Consumer異常退出,Message也不會丢失。但是如果RabbitMQ Server退出呢?軟體都有bug,即使RabbitMQ Server是完美毫無bug的(當然這是不可能的,是軟體就有bug,沒有bug的那不叫軟體),它還是有可能退出的:被其它軟體影響,或者系統重新開機了,系統panic了。。。
為了保證在RabbitMQ退出或者crash了資料仍沒有丢失,需要将queue和Message都要持久化。
queue的持久化需要在聲明時指定durable=True:
[python] view plaincopy
- channel.queue_declare(queue='hello', durable=True)
上述語句執行不會有什麼錯誤,但是确得不到我們想要的結果,原因就是RabbitMQ Server已經維護了一個叫hello的queue,那麼上述執行不會有任何的作用,也就是hello的任何屬性都不會被影響。這一點在上篇文章也讨論過。
那麼workaround也很簡單,聲明一個另外的名字的queue,比如名字定位task_queue:
[python] view plaincopy
- channel.queue_declare(queue='task_queue', durable=True)
再次強調,Producer和Consumer都應該去建立這個queue,盡管隻有一個地方的建立是真正起作用的:
接下來,需要持久化Message,即在Publish的時候指定一個properties,方式如下:
[python] view plaincopy
- channel.basic_publish(exchange='',
- routing_key="task_queue",
- body=message,
- properties=pika.BasicProperties(
- delivery_mode = 2, # make message persistent
- ))
關于持久化的進一步讨論:
為了資料不丢失,我們采用了:
- 在資料處理結束後發送ack,這樣RabbitMQ Server會認為Message Deliver 成功。
- 持久化queue,可以防止RabbitMQ Server 重新開機或者crash引起的資料丢失。
- 持久化Message,理由同上。
但是這樣能保證資料100%不丢失嗎?
答案是否定的。問題就在與RabbitMQ需要時間去把這些資訊存到磁盤上,這個time window雖然短,但是它的确還是有。在這個時間視窗内如果資料沒有儲存,資料還會丢失。還有另一個原因就是RabbitMQ并不是為每個Message都做fsync:它可能僅僅是把它儲存到Cache裡,還沒來得及儲存到實體磁盤上。
是以這個持久化還是有問題。但是對于大多數應用來說,這已經足夠了。當然為了保持一緻性,你可以把每次的publish放到一個transaction中。這個transaction的實作需要user defined codes。
那麼商業系統會做什麼呢?一種可能的方案是在系統panic時或者異常重新開機時或者斷電時,應該給各個應用留出時間去flash cache,保證每個應用都能exit gracefully。
5. Fair dispatch 公平分發
你可能也注意到了,分發機制不是那麼優雅。預設狀态下,RabbitMQ将第n個Message分發給第n個Consumer。當然n是取餘後的。它不管Consumer是否還有unacked Message,隻是按照這個預設機制進行分發。
那麼如果有個Consumer工作比較重,那麼就會導緻有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。那麼,RabbitMQ是如何處理這種問題呢?
通過 basic.qos 方法設定prefetch_count=1 。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會将新的Message分發給它。 設定方法如下:
[python] view plaincopy
- channel.basic_qos(prefetch_count=1)
注意,這種方法可能會導緻queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者建立更多的virtualHost來細化你的設計。
6. 最終版本
new_task.py script:
[python] view plaincopy
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='task_queue', durable=True)
- message = ' '.join(sys.argv[1:]) or "Hello World!"
- channel.basic_publish(exchange='',
- routing_key='task_queue',
- body=message,
- properties=pika.BasicProperties(
- delivery_mode = 2, # make message persistent
- ))
- print " [x] Sent %r" % (message,)
- connection.close()
worker.py script:
[python] view plaincopy
- #!/usr/bin/env python
- import pika
- import time
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='task_queue', durable=True)
- print ' [*] Waiting for messages. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(callback,
- queue='task_queue')
- channel.start_consuming()