python中的線程queue可以實作不同線程間的通信,程序queue可以實作python不同程序間的通信
RabbitMQ消息隊列就相當于中間人,可以實作獨立程序間的通信,也可以實作在不同程式設計語言中進行通信

windows環境下安裝完成RabbitMQ後,輸入cmd指令services.msc,然後在服務中開啟RabbitMQ的服務,使用RabbitMQ要安裝Erlang語言環境
Ubuntu環境下安裝RabbitMQ
[email protected]:~$ sudo apt install rabbitmq-server
[email protected]:~$ sudo rabbitmq-server start
RabbitMQ預設的監聽端口為5672
發送消息端
# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) # 建立一個socket
channel = connection.channel() # 聲明一個管道,在管道裡進行通信
channel.queue_declare(queue=‘q‘) # 在管道裡聲明一個名q為queue
channel.basic_publish( # 發送消息
exchange=‘‘,
routing_key=‘q‘, # queue名字
body=‘Hello World!‘, # 要發送的消息
)
print(‘資料發送完成‘)
connection.close() # 關閉隊列
接收消息端
# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))
channel = connection.channel() # 聲明一個管道,在管道裡進行通信
channel.queue_declare(queue=‘q‘) # 多聲明一個queue,防止當此程式比發送消息端先啟動時報錯
def callback(ch, method, properties, body):
print(ch, method, properties, body)
# ch為管道的記憶體位址,method為關于queue隊列的一些資訊,body為消息内容
print(‘收到資料:‘, body)
channel.basic_consume( # basic_consume開始消費消息
callback, # 如果收到消息就調用callback函數來處理消息
queue=‘q‘, # 從q隊列裡接收消息
no_ack=True, # 是否不确認消息是否處理完,預設為False
)
print(‘開始等待消息‘)
channel.start_consuming() # 開始接收消息,如果沒有消息就會卡在這,直到有消息
開啟三個接收消息端
發送消息端發送一個消息,最先開啟的接收消息端先收到消息
發送消息端再發送消息的話,接收消息的就是第二開啟的接收消息端,然後是第三個接收消息端,之後再是第一個
RabbitMQ會輪詢發消息
RabbitMQ安裝目錄下的skin目錄裡rabbitmqctl.bat可以檢視目前隊列情況
rabbitmqctl.bat list_queues
接收消息端處理消息時要跟伺服器端确認消息處理的情況,以防止接收消息端在處理消息時突然停止運作導緻消息丢失
發送消息端
# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) # 建立一個socket
channel = connection.channel() # 聲明一個管道,在管道裡進行通信
channel.queue_declare(queue=‘q‘) # 在管道裡聲明一個名q為queue
channel.basic_publish( # 發送消息
exchange=‘‘,
routing_key=‘q‘, # queue名字
body=‘Hello World!‘, # 要發送的消息
)
print(‘資料發送完成‘)
connection.close() # 關閉隊列
接收消息端
# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))
channel = connection.channel() # 聲明一個管道,在管道裡進行通信
channel.queue_declare(queue=‘q‘) # 多聲明一個queue,防止當此程式比生産者程式先啟動時報錯
def callback(ch, method, properties, body):
print(ch, method, properties, body)
# ch為管道的記憶體位址,method為關于queue隊列的一些資訊,body為消息内容
time.sleep(20)
print(‘收到資料:‘, body)
ch.basic_ack(delivery_tag=method.delivery_tag) # 跟伺服器端确認消息已經處理完成
print(‘消息處理完成‘)
channel.basic_consume( # basic_consume開始消費消息
callback, # 如果收到消息就調用callback函數來處理消息
queue=‘q‘, # 從q隊列裡接收消息
)
print(‘開始等待消息‘)
channel.start_consuming() # 開始接收消息,如果沒有消息就會卡在這,直到有消息
開啟3個消息接收端,1個發送消息端
開啟3個接收消息端,等待接收消息
發送消息端發送消息,第一個啟動的接收消息端接收到消息
然後關掉第一個接收消息端,第二個啟動的接收消息端收到消息
然後是第三個,第三個之後還是第一個,除非消息處理完成
消息持久化
如果接收消息端正在接收消息的時候,伺服器端(RabbitMQ)斷了,接收消息端就會報錯,消息就會丢失
如果不想伺服器端突然斷開而導緻消息丢失,可以使消息持久化
發送消息端
# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) # 建立一個socket
channel = connection.channel() # 聲明一個管道,在管道裡進行通信
channel.queue_declare(queue=‘q‘, durable=True) # 在管道裡聲明一個名q為queue,durable為隊列持久化
channel.basic_publish( # 發送消息
exchange=‘‘,
routing_key=‘q‘, # queue名字
body=‘Hello World!‘, # 要發送的消息
properties=pika.BasicProperties(delivery_mode=2) # 使消息持久化
)
print(‘資料發送完成‘)
connection.close() # 關閉隊列
接收消息端
# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))
channel = connection.channel() # 聲明一個管道,在管道裡進行通信
channel.queue_declare(queue=‘q‘,durable=True) # durable為隊列持久化
def callback(ch, method, properties, body):
print(ch, method, properties, body)
# ch為管道的記憶體位址,method為關于queue隊列的一些資訊,body為消息内容
time.sleep(20)
print(‘收到資料:‘, body)
ch.basic_ack(delivery_tag=method.delivery_tag) # 跟伺服器端确認消息已經處理完成
print(‘消息處理完成‘)
channel.basic_consume( # basic_consume開始消費消息
callback, # 如果收到消息就調用callback函數來處理消息
queue=‘q‘, # 從q隊列裡接收消息
)
print(‘開始等待消息‘)
channel.start_consuming() # 開始接收消息,如果沒有消息就會卡在這,直到有消息
這樣的話,即使伺服器端斷開了,隊列和消息也還會在
如果沒有使隊列和消息持久化的話,伺服器端重新開機後,隊列和消息就沒了