天天看點

python rabitmq_Python-RabbitMQ消息隊列

python中的線程queue可以實作不同線程間的通信,程序queue可以實作python不同程序間的通信

RabbitMQ消息隊列就相當于中間人,可以實作獨立程序間的通信,也可以實作在不同程式設計語言中進行通信

python rabitmq_Python-RabbitMQ消息隊列

windows環境下安裝完成RabbitMQ後,輸入cmd指令services.msc,然後在服務中開啟RabbitMQ的服務,使用RabbitMQ要安裝Erlang語言環境

python rabitmq_Python-RabbitMQ消息隊列

Ubuntu環境下安裝RabbitMQ

[email protected]:~$ sudo apt install rabbitmq-server

[email protected]:~$ sudo rabbitmq-server start

RabbitMQ預設的監聽端口為5672

發送消息端

# -*- coding:utf-8 -*-

__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) # 建立一個socket

channel = connection.channel() # 聲明一個管道,在管道裡進行通信

channel.queue_declare(queue=‘q‘) # 在管道裡聲明一個名q為queue

channel.basic_publish( # 發送消息

exchange=‘‘,

routing_key=‘q‘, # queue名字

body=‘Hello World!‘, # 要發送的消息

)

print(‘資料發送完成‘)

connection.close() # 關閉隊列

接收消息端

# -*- coding:utf-8 -*-

__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))

channel = connection.channel() # 聲明一個管道,在管道裡進行通信

channel.queue_declare(queue=‘q‘) # 多聲明一個queue,防止當此程式比發送消息端先啟動時報錯

def callback(ch, method, properties, body):

print(ch, method, properties, body)

# ch為管道的記憶體位址,method為關于queue隊列的一些資訊,body為消息内容

print(‘收到資料:‘, body)

channel.basic_consume( # basic_consume開始消費消息

callback, # 如果收到消息就調用callback函數來處理消息

queue=‘q‘, # 從q隊列裡接收消息

no_ack=True, # 是否不确認消息是否處理完,預設為False

)

print(‘開始等待消息‘)

channel.start_consuming() # 開始接收消息,如果沒有消息就會卡在這,直到有消息

開啟三個接收消息端

python rabitmq_Python-RabbitMQ消息隊列

發送消息端發送一個消息,最先開啟的接收消息端先收到消息

python rabitmq_Python-RabbitMQ消息隊列

發送消息端再發送消息的話,接收消息的就是第二開啟的接收消息端,然後是第三個接收消息端,之後再是第一個

RabbitMQ會輪詢發消息

RabbitMQ安裝目錄下的skin目錄裡rabbitmqctl.bat可以檢視目前隊列情況

rabbitmqctl.bat list_queues

python rabitmq_Python-RabbitMQ消息隊列

接收消息端處理消息時要跟伺服器端确認消息處理的情況,以防止接收消息端在處理消息時突然停止運作導緻消息丢失

發送消息端

# -*- coding:utf-8 -*-

__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) # 建立一個socket

channel = connection.channel() # 聲明一個管道,在管道裡進行通信

channel.queue_declare(queue=‘q‘) # 在管道裡聲明一個名q為queue

channel.basic_publish( # 發送消息

exchange=‘‘,

routing_key=‘q‘, # queue名字

body=‘Hello World!‘, # 要發送的消息

)

print(‘資料發送完成‘)

connection.close() # 關閉隊列

接收消息端

# -*- coding:utf-8 -*-

__author__ = "MuT6 Sch01aR"

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))

channel = connection.channel() # 聲明一個管道,在管道裡進行通信

channel.queue_declare(queue=‘q‘) # 多聲明一個queue,防止當此程式比生産者程式先啟動時報錯

def callback(ch, method, properties, body):

print(ch, method, properties, body)

# ch為管道的記憶體位址,method為關于queue隊列的一些資訊,body為消息内容

time.sleep(20)

print(‘收到資料:‘, body)

ch.basic_ack(delivery_tag=method.delivery_tag) # 跟伺服器端确認消息已經處理完成

print(‘消息處理完成‘)

channel.basic_consume( # basic_consume開始消費消息

callback, # 如果收到消息就調用callback函數來處理消息

queue=‘q‘, # 從q隊列裡接收消息

)

print(‘開始等待消息‘)

channel.start_consuming() # 開始接收消息,如果沒有消息就會卡在這,直到有消息

開啟3個消息接收端,1個發送消息端

python rabitmq_Python-RabbitMQ消息隊列

開啟3個接收消息端,等待接收消息

python rabitmq_Python-RabbitMQ消息隊列

發送消息端發送消息,第一個啟動的接收消息端接收到消息

python rabitmq_Python-RabbitMQ消息隊列

然後關掉第一個接收消息端,第二個啟動的接收消息端收到消息

python rabitmq_Python-RabbitMQ消息隊列

然後是第三個,第三個之後還是第一個,除非消息處理完成

消息持久化

如果接收消息端正在接收消息的時候,伺服器端(RabbitMQ)斷了,接收消息端就會報錯,消息就會丢失

python rabitmq_Python-RabbitMQ消息隊列

如果不想伺服器端突然斷開而導緻消息丢失,可以使消息持久化

發送消息端

# -*- coding:utf-8 -*-

__author__ = "MuT6 Sch01aR"

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘)) # 建立一個socket

channel = connection.channel() # 聲明一個管道,在管道裡進行通信

channel.queue_declare(queue=‘q‘, durable=True) # 在管道裡聲明一個名q為queue,durable為隊列持久化

channel.basic_publish( # 發送消息

exchange=‘‘,

routing_key=‘q‘, # queue名字

body=‘Hello World!‘, # 要發送的消息

properties=pika.BasicProperties(delivery_mode=2) # 使消息持久化

)

print(‘資料發送完成‘)

connection.close() # 關閉隊列

接收消息端

# -*- coding:utf-8 -*-

__author__ = "MuT6 Sch01aR"

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘))

channel = connection.channel() # 聲明一個管道,在管道裡進行通信

channel.queue_declare(queue=‘q‘,durable=True) # durable為隊列持久化

def callback(ch, method, properties, body):

print(ch, method, properties, body)

# ch為管道的記憶體位址,method為關于queue隊列的一些資訊,body為消息内容

time.sleep(20)

print(‘收到資料:‘, body)

ch.basic_ack(delivery_tag=method.delivery_tag) # 跟伺服器端确認消息已經處理完成

print(‘消息處理完成‘)

channel.basic_consume( # basic_consume開始消費消息

callback, # 如果收到消息就調用callback函數來處理消息

queue=‘q‘, # 從q隊列裡接收消息

)

print(‘開始等待消息‘)

channel.start_consuming() # 開始接收消息,如果沒有消息就會卡在這,直到有消息

這樣的話,即使伺服器端斷開了,隊列和消息也還會在

python rabitmq_Python-RabbitMQ消息隊列

如果沒有使隊列和消息持久化的話,伺服器端重新開機後,隊列和消息就沒了

python rabitmq_Python-RabbitMQ消息隊列