Python使用RabbitMQ
接下來就使用Python來簡單的對以下幾種隊列類型進行一部分的操作,主要是為了更加容易去了解它,站在開發的角度去看待RabbitMQ;
建立通路使用者
# 建立一個cce使用者
[[email protected] ~]# rabbitmqctl add_user cce caichangen
# 建立一個虛拟主機
[[email protected] ~]# rabbitmqctl add_vhost simpleTest
# 賦予使用者一個角色
[[email protected] ~]# rabbitmqctl set_user_tags cce administrator
# 對該使用者進行授權
[[email protected] ~]# rabbitmqctl set_permissions -p simpleTest cce ".*" ".*" ".*"
簡單的實作隊列
# 生産者
import pika
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, credentials=auth, virtual_host='simpleTest'))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明需要開啟的queue名稱
channel.queue_declare(queue='simpleTest')
# RabbitMQ消息永遠不能直接發送到隊列,它總是需要通過交換
# 将消息發送到隊列
for num in range(1, 5):
channel.basic_publish(exchange='',routing_key='hello',
body='Hello World!' + str(num))
print("已經向RabbitMQ中送出資料'")
connection.close()
# 消費者
import pika
# 指定使用者名和密碼
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1',port=5672,virtual_host='simpleTest',credentials=auth))
channel = connection.channel()
# 聲明需要接收消息的通道
channel.queue_declare(queue='hello')
# 訂閱的回調函數這個訂閱回調函數是由pika庫來調用的,ch就是上面的channel執行個體
def callback(ch, method, properties, body):
print("從RabbitMQ擷取的資料是:%s" % body.decode('utf-8'))
# 指定消費者參數,回調函數,和接收消息的通道,一收到消息就會調callback函數
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
# 開始接收資訊,并進入阻塞狀态,隊列裡有資訊才會調用callback進行處理。實際就是一個select監聽
channel.start_consuming()
消息确認機制
如上,我們已經實作了基礎的隊列功能,那麼就帶來了另一個問題,當我們的消費者接受到一個消息的時候,這個消息比較耗時,如果在這個消費的過程中伺服器記憶體溢出等其他情況導緻消息沒有完成消費,那麼此時為了解決這個消息的遺漏問題,RabbitMQ提供了一個消息确認機制,當消息交給消費者之後,如果消費者消費完成,需要給RabbitMQ回複一個消息确認資訊,如果這個資訊沒有被确認消費成功那麼當沒有消費成功的用戶端和RabbitMQ的連接配接斷開之後會把這個消費重新配置設定給其他的用戶端進行消費;
那麼解決上面用戶端的問題,那麼往深了想我們的RabbitMQ服務端也可能會出現這個問題,當RabbitMQ挂掉之後,那麼這個消息也不應該丢失,是以RabbitMQ的持久化機制給我們帶來了很好的體驗;
消息持久化
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就當機(或出現其他意外)的情況,這種情況下就可能會導緻消息丢失。為了避免這種情況發生,我們可以要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後才将該消息從Queue中移除;如果RabbitMQ沒有收到回執并檢測到消費者的RabbitMQ連接配接斷開,則RabbitMQ會将該消息發送給其他消費者(如果存在多個消費者)進行處理。這裡不存在timeout概念,一個消費者處理消息時間再長也不會導緻該消息被發送給其他消費者,除非它的RabbitMQ連接配接斷開。 這裡會産生另外一個問題,如果我們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這将會導緻嚴重的bug——Queue中堆積的消息會越來越多;消費者重新開機後會重複消費這些消息并重複執行業務邏輯…千面是因為我們在消費者端标記了ACK=True關閉了它們,如果你沒有增加ACK=True或者沒有回執就會出現這個問題;
關鍵參數:
durable:Bool值,代表queue隊列通道是否持久化,想要消息持久化,那麼隊列必須持久化;
# 生産者需要在發送消息的時候标注屬性為持久化
import pika
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1',port=5672,virtual_host='simpleTest',credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明需要開啟的queue名稱
channel.queue_declare(queue='task_queue')
# RabbitMQ消息永遠不能直接發送到隊列,它總是需要通過交換
# 将消息發送到隊列
channel.basic_publish(exchange='', # 使用預設交換器
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties( # 消息持久化
delivery_mode=2, # 設為2表示标記消息為持久化
))
print("已經向RabbitMQ中送出資料'")
connection.close()
# 消費者需要發送消息回執
import pika
import time
# 指定使用者名和密碼
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1',port=5672,virtual_host='simpleTest',credentials=auth))
channel = connection.channel()
# 聲明需要接收消息的通道
channel.queue_declare(queue='hello')
# ch就是上面的channel執行個體
def callback(ch, method, properties, body):
print(" 從RabbitMQ擷取的資料是:%s" % body.decode('utf-8'))
time.sleep(5)
ch.basic_ack(delivery_tag=method.delivery_tag) # 這段代碼表示确認這個消息消費完成,傳回這個消息的唯一辨別符,發送消息确認,确認交易辨別符
print('資料消費完成')
# 指定消費的回調函數,和接收消息的通道,一收到消息就會調callback函數
channel.basic_consume('hello', callback, auto_ack=False) # 這裡需要将自動确認消息設定為False
# 開啟消息死循環,實際就是一個select監聽
channel.start_consuming()
RabbitMQ檢視沒有被ACK的消息
# Linux
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# Windows
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
隊列持久化
如果我們希望即使在RabbitMQ服務重新開機的情況下,也不會丢失消息,我們可以将Queue與Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丢失。但依然解決不了小機率丢失事件的發生(比如RabbitMQ伺服器已經接收到生産者的消息,但還沒來得及持久化該消息時RabbitMQ伺服器就斷電了),如果我們需要對這種小機率事件也要管理起來,那麼我們要用到事務。由于這裡僅為RabbitMQ的簡單介紹,是以這裡将不講解RabbitMQ相關的事務。 這裡我們需要修改下生産者和消費者設定RabbitMQ消息的持久化**[生産者/消費者]都需要配置;
channel.queue_declare(queue='task_queue', durable=True) # 隊列持久化
關鍵參數:
durable:Bool值,代表queue隊列通道是否持久化,想要消息持久化,那麼隊列必須持久化;
delivery_mode:标記RabbitMQ服務端消息持久化,值為2的時候為持久化其他任何值都是瞬态的;
# 生産者消息持久化
import pika
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1',port=5672,virtual_host='simpleTest',credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明需要開啟的queue名稱
channel.queue_declare(queue='hello',durable=True)
# RabbitMQ消息永遠不能直接發送到隊列,它總是需要通過交換
# 将消息發送到隊列
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print("已經向RabbitMQ中送出資料'")
connection.close()
公平分發
預設情況下RabitMQ會把隊列裡面的消息立即發送到消費者,無論該消費者有多少消息沒有應答,也就是說即使發現消費者來不及處理,新的消費者加入進來也沒有辦法處理已經堆積的消息,因為那些消息已經被發送給老消費者了。
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多于N個消息,即一旦有N個消息還沒有ack,則該consumer将block掉,直到有消息ack。 這樣做的好處是,如果系統處于高峰期,消費者來不及處理,消息會堆積在隊列中,新啟動的消費者可以馬上從隊列中取到消息開始工作。
在消費者中增加channel.basic_qos(prefetch_count=1)實作公平分發

工作過程如下:
1、消費者1接收到消息後處理完畢發送了ack并接收新的消息并處理;
2、消費者2接收到消息後處理完畢發送了ack并接收新的消息并處理;
3、消費者3接收到消息後一直處于消息中并沒有發送ack不在接收消息一直等到消費者3處理完畢後發送ACK後再接收新消息;
釋出訂閱(publish/subscribe)
在前面我們學了work Queue它主要是把每個任務分給一個worker[工作者]接下來我們要玩些不同的,把消息發多個消費者(不同的隊列中). 這個就要用到Exchange了,這個模式稱之為"釋出訂閱";
訂閱的方式他和一對一不一樣,消費者沒有連接配接的時候消息會進行持久化,但是釋出訂閱當消費者沒有連接配接的時候,那麼這個消息就丢失了,并且釋出訂閱是每個收聽者一個隊列,如果和别人共享一個隊列,那麼就可能出現,A把消息收走了,B卻沒收到;
因為在釋出訂閱的模式下面每個隊列都需要接收到消息生産者不可能每個隊列每個隊列的發,是以這個時候就需要用到Exchange了,它作為轉發器負責将廣播發送到每個指定的隊列,Exchange還需要維護一個訂閱清單,是以它才知道需要将廣播發送給哪個收聽者;
關鍵參數:
queue_bind:當我們建立了Exchanges和(QUEUE)隊列後,我們需要告訴Exchange發送到們的Queue隊列中,所需要需要把Exchange和隊列(Queue)進行綁定,
Exchanges可用類型有四種:
direct:多點傳播,通過routingkey和exchange決定的哪一組queue可以接收消息;
fanout:廣播,所有bind到此exchage的queue都會接收到消息;
topic:所有符合routingkey(可以是一個表達式)的rroutingkey索bind的queue可以接收消息;
表達式符号:
#:代表一個或多個字元;
*:比對前面或者後面任何字元;
#.a:會比對a.a,aa.a,aaa.a等;
a.#:會比對a.a,a.aa,a.aaa等;
*.a:會比對a.a,aa.a,aaa.a等;
a.*:會比對a.aa,a.aaa,a.aaa等;
#:比對所有;
注意:使用routingkey為#,exchange Type為topic的時候相當于fanout;
headers:通過headers消息頭來決定把消息發給哪些queue;
fanout廣播模式
模式特點:
可以了解他是一個廣播模式;
不需要routing key它的消息發送時通過Exchange binding進行路由的~~在這個模式下routing key失去作用;
這種模式需要提前将Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定;
如果接收到消息的Exchange沒有與任何Queue綁定,則消息會被抛棄;
示例如下
# 廣播模式生産者
import pika
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1',port=5672,virtual_host='simpleTest',credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和exchange的類型
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='caidaye')
print("已經向RabbitMQ轉發器發送廣播")
connection.close()
# 廣播模式消費者
import pika
# 指定使用者名和密碼
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, virtual_host='simpleTest', credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和exchange的類型
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(queue='',
exclusive=True) # 聲明隊列,這個隊列用來接收廣播,因為說了為了保證消息可達,需要隊列唯一,一人一隊列,使用exclusive=True表示不讓其他人加入到這個隊列,不指定queue名稱rabbitmq會自動生成一個,當這個消費者斷開之後會自動删掉,重新生成一個queue
queue_name = result.method.queue # 獲得rabbitmq自動生成的隊列名
channel.queue_bind(exchange='logs', queue=queue_name)
# ch就是上面的channel執行個體
def callback(ch, method, properties, body):
print(" 從RabbitMQ擷取的資料是:%s" % body.decode('utf-8'))
# 指定消費的回調函數,和接收消息的通道,一收到消息就會調callback函數
channel.basic_consume(queue_name, callback, auto_ack=True) # 這裡需要将自動确認消息設定為False
# 開啟消息死循環,實際就是一個select監聽
channel.start_consuming()
direct多點傳播模式
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue
1、一般情況可以使用rabbitMQ自帶的Exchange:" "(該Exchange的名字為空字元串),也可以自定義Exchange;
2、這種模式下不需要将Exchange進行任何綁定(bind)操作。當然也可以進行綁定。可以将不同的routing_key與不同的queue進行綁定,不同的queue與不同exchange進行綁定;
3、消息傳遞時需要一個“routing_key”;
4、如果消息中不存在routing_key中綁定的隊列名,則該消息會被抛棄;
如果一個exchange 聲明為direct,并且bind中指定了routing_key,那麼發送消息時需要同時指明該exchange和routing_key;
簡而言之就是:生産者生成消息發送給Exchange, Exchange根據Exchange類型和basic_publish中的routing_key進行消息發送消費者:訂閱Exchange并根據Exchange類型和binding key(bindings 中的routing key),如果生産者和訂閱者的routing_key相同,Exchange就會路由到那個隊列。
老規矩還是通過執行個體來說:
在上面的文檔中我們建立了一個簡單的日志系統,我們把消息發給所有的訂閱者 在下面的内容中将把特定的消息發給特定的訂閱者,舉個例子來說,把error級别的報警寫如檔案,并把所有的報警列印到螢幕中,進行了路由的規則類似下面的架構;
這裡也要注意一個routing key 是可以綁定多個隊列的
在上面我們已經建立過bindings了類似下面
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
消費者端:Bindings可以增加routing_key 這裡不要和basic_publish中的參數弄混了,我們給它稱之為**binding key**
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
binding key的含義依賴于Exchange類型,fanout exchanges類型隻是忽略了它
示例如下
# 多點傳播模式生産者
import pika
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, virtual_host='simpleTest', credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和exchange的類型
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='info', body='info')
print("已經向RabbitMQ轉發器發送info多點傳播")
channel.basic_publish(exchange='direct_logs', routing_key='error', body='error')
print("已經向RabbitMQ轉發器發送error多點傳播")
connection.close()
# 多點傳播模式用戶端1
import pika
# 指定使用者名和密碼
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, virtual_host='simpleTest', credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和direct的類型
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(queue='',exclusive=True) #聲明隊列,這個隊列用來接收廣播,因為說了為了保證消息可達,需要隊列唯一,一人一隊列,使用exclusive=True表示不讓其他人加入到這個隊列,不指定queue名稱rabbitmq會自動生成一個,當這個消費者斷開之後會自動删掉,重新生成一個queue
queue_name = result.method.queue # 獲得rabbitmq自動生成的隊列名
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key='info')
# ch就是上面的channel執行個體
def callback(ch, method, properties, body):
print(" 從RabbitMQ擷取的資料是:%s" % body.decode('utf-8'))
# 指定消費的回調函數,和接收消息的通道,一收到消息就會調callback函數
channel.basic_consume(queue_name, callback, auto_ack=True) # 這裡需要将自動确認消息設定為False
# 開啟消息死循環,實際就是一個select監聽
channel.start_consuming()
# 從RabbitMQ擷取的資料是:info
# 多點傳播模式用戶端2
import pika
# 指定使用者名和密碼
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, virtual_host='simpleTest', credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和exchange的類型
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(queue='',exclusive=True) #聲明隊列,這個隊列用來接收廣播,因為說了為了保證消息可達,需要隊列唯一,一人一隊列,使用exclusive=True表示不讓其他人加入到這個隊列,不指定queue名稱rabbitmq會自動生成一個,當這個消費者斷開之後會自動删掉,重新生成一個queue
queue_name = result.method.queue # 獲得rabbitmq自動生成的隊列名
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key='error')
# ch就是上面的channel執行個體
def callback(ch, method, properties, body):
print(" 從RabbitMQ擷取的資料是:%s" % body.decode('utf-8'))
# 指定消費的回調函數,和接收消息的通道,一收到消息就會調callback函數
channel.basic_consume(queue_name, callback, auto_ack=True) # 這裡需要将自動确認消息設定為False
# 開啟消息死循環,實際就是一個select監聽
channel.start_consuming()
# 從RabbitMQ擷取的資料是:error
topic類型
前面講到direct類型的Exchange路由規則是完全比對binding key與routingkey,但這種嚴格的比對方式在很多情況下不能滿足實際業務需求;
topic類型的Exchange在比對規則上進行了擴充,它與direct類型的Exchage相似,也是将消息路由到binding key與routing key相比對的Queue中,但這裡的比對規則有些不同;
它約定:
routing key為一個句點号“. ”分隔的字元串(我們将被句點号“. ”分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”;
binding key與routing key一樣也是句點号“. ”分隔的字元串,以.分割這點很重要----------------------------------;
binding key中可以存在兩種特殊字元“*”與“#”,用于做模糊比對,其中“*”用于比對一個單詞,“#”用于比對多個單詞(可以是零個);
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(隻會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都比對);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将會被丢棄,因為它們沒有比對任何bindingKey;
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
示例如下
# 多點傳播模式topic服務端
import pika
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, virtual_host='simpleTest', credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和exchange的類型
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.basic_publish(exchange='topic_logs', routing_key='mysql.caaa', body='info')
print("已經向RabbitMQ轉發器發送info多點傳播")
channel.basic_publish(exchange='topic_logs', routing_key='mysql.cbbb', body='error')
print("已經向RabbitMQ轉發器發送error多點傳播")
connection.close()
# 多點傳播模式topic用戶端1
import pika
# 指定使用者名和密碼
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, virtual_host='simpleTest', credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和direct的類型
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
result = channel.queue_declare(queue='',exclusive=True) #聲明隊列,這個隊列用來接收廣播,因為說了為了保證消息可達,需要隊列唯一,一人一隊列,使用exclusive=True表示不讓其他人加入到這個隊列,不指定queue名稱rabbitmq會自動生成一個,當這個消費者斷開之後會自動删掉,重新生成一個queue
queue_name = result.method.queue # 獲得rabbitmq自動生成的隊列名
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key='mysql.caaa')
# ch就是上面的channel執行個體
def callback(ch, method, properties, body):
print(" 從RabbitMQ擷取的資料是:%s" % body.decode('utf-8'))
# 指定消費的回調函數,和接收消息的通道,一收到消息就會調callback函數
channel.basic_consume(queue_name, callback, auto_ack=True) # 這裡需要将自動确認消息設定為False
# 開啟消息死循環,實際就是一個select監聽
channel.start_consuming()
# 從RabbitMQ擷取的資料是:info
# 多點傳播模式topic用戶端2
import pika
# 指定使用者名和密碼
auth = pika.PlainCredentials('cce', 'caichangen')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.16.1.1', port=5672, virtual_host='simpleTest', credentials=auth))
# 開啟一個rabbitMQ協定通道
channel = connection.channel()
# 通過通道聲明exchange和exchange的類型
channel.exchange_declare(exchange='topic_logs',
exchange_type='topic')
result = channel.queue_declare(queue='',exclusive=True) #聲明隊列,這個隊列用來接收廣播,因為說了為了保證消息可達,需要隊列唯一,一人一隊列,使用exclusive=True表示不讓其他人加入到這個隊列,不指定queue名稱rabbitmq會自動生成一個,當這個消費者斷開之後會自動删掉,重新生成一個queue
queue_name = result.method.queue # 獲得rabbitmq自動生成的隊列名
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key='mysql.*')
# ch就是上面的channel執行個體
def callback(ch, method, properties, body):
print(" 從RabbitMQ擷取的資料是:%s" % body.decode('utf-8'))
# 指定消費的回調函數,和接收消息的通道,一收到消息就會調callback函數
channel.basic_consume(queue_name, callback, auto_ack=True) # 這裡需要将自動确認消息設定為False
# 開啟消息死循環,實際就是一個select監聽
channel.start_consuming()
# 從RabbitMQ擷取的資料是:info
# 從RabbitMQ擷取的資料是:error