天天看點

RabbitMQ消息隊列

一、RabbitMQ安裝

1、簡介

RabbitMQ是一個消息代理:它接受和轉發消息。可以将其視為郵局,當你要發送的郵件放入郵局中時,你可以确認郵件是否安全的達到接收者的手中,這裡RabbitMQ就相當于郵局的角色。

2、RabbitMQ的安裝

  • 安裝erlang

因為RabbitMQ是erlang語言開發的,是以需要先安裝erlang。

從EPEL源安裝:

[root@localhost ~]# yum install epel-release      
[root@localhost ~]# yum install erlang       
  • 安裝RabbitMQ

下載下傳:

[root@localhost ~]# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm      

安裝:

[root@localhost ~]# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm       

啟動服務:

[root@localhost bin]# rabbitmq-server start      

檢視使用者以及權限:

檢視使用者:rabbitmqctl list_users  

檢視使用者權限:rabbitmqctl list_user_permissions guest

新增使用者: rabbitmqctl add_user admin 123

賦予管理者權限:

rabbitmqctl set_user_tags admin administrator 

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"       

啟動web監控:

[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management      

通路預設端口15672

RabbitMQ消息隊列

另外,如果操作RabbitMQ,需要安裝API,進行操作,在開發環境中安裝pika子產品

pip install pika      

二、六種工作模式

1、生産者消費者模式

RabbitMQ消息隊列

“P”是生産者,“C”是消費者。中間的框是一個隊列 ,用于存放消息。

生産者是将任務放入到隊列中:

import pika

#建立使用者名密碼
credentials = pika.PlainCredentials("admin","123")
#建立連接配接
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()
# 建立一個隊列
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello', # 消息隊列名稱
                      body='helloworld')
connection.close()      

消費者是取出任務隊列并且進行處理:

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

# 建立一個隊列,如果已經存在就不會重新建立
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body)

channel.basic_consume('hello',callback,auto_ack=True)

channel.start_consuming()      

詳情參考:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

2、 競争消費者模式

RabbitMQ消息隊列

  (1)消息确認auto_ack

  如果消費者(上圖中的C1和C2)處理從隊列取出的任務,但是沒有完成時就已經挂掉了,那麼如果使用之前的代碼auto_ack=True,一旦RabbitMQ向消費者傳遞任務,它立即将其标記為删除。在這種情況下,如果挂掉一個消費者,将丢失它剛剛處理的任務。

   為了確定任務永不丢失,RabbitMQ支援消息确認。消費者發回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由删除它。

  如果消費者挂掉(其通道關閉,連接配接關閉或TCP連接配接丢失)而不發送确認,RabbitMQ将未完全處理的任務并重新排隊。如果同時有其他線上消費者,則會迅速将其重新發送給其他消費者。

主要變動在于消費者的變動:

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

# 建立一個隊列,如果已經存在就不會重新建立
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body)

    ch.basic_ack(delivery_tag=method.delivery_tag)#防止消費者挂掉任務丢失
channel.basic_consume('hello',callback,auto_ack=False)#預設情況auto_ack=False手動消息确認打開

channel.start_consuming()      

生産者未變動:

import pika

#建立使用者名密碼
credentials = pika.PlainCredentials("admin","123")
#建立連接配接
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()
# 建立一個隊列
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello', # 消息隊列名稱
                      body='你好')
connection.close()      

  (2)消息持久性

如果RabbitMQ伺服器停止,隊列中任務仍然會丢失。此時需要使用durable 參數,聲明隊列是持久的,但是此時以前的隊列就不可以使用,持久的隊列需要重新聲明,也就是說需要改一下隊列名稱。

主要變動在與生産者的變動:

import pika

#建立使用者名密碼
credentials = pika.PlainCredentials("admin","123")
#建立連接配接
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()
# 建立一個持久化隊列
channel.queue_declare(queue = 'task_queue',durable = True)

channel.basic_publish(exchange='',
                      routing_key='task_queue', # 消息隊列名稱
                      body='你好',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 将消息标記為持久性
                      ))

connection.close()      

消費者未變動:

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

# 聲明一個隊列,已經建立就不會再建立了
channel.queue_declare(queue='task_queue')

def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body)

channel.basic_consume(‘task_queue’,callback,auto_ack=True)

channel.start_consuming()      

  (3)公平派遣

  可以看到上面的圖,一個生産者,兩個消費者,生産者将任務不斷的放入到隊列中,消費者不斷的取出任務,那麼這兩個消費者是如何取任務的呢?

  RabbitMQ預設情況下是均勻地發送消息,也就是消費者一個接一個的取出任務。這樣如果一個消費者處理任務的時間比較長,還是均勻的給任務,勢必造成一個消費者将經常忙,而另一個會很閑。

  此時可以通過basic.qos方法和 prefetch_count = 1設定,将任務發送給空閑的消費者。

變動主要在消費者:

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

# 聲明一個隊列(建立一個隊列)
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body)

channel.basic_qos(prefetch_count=1)
channel.basic_consume('hello',callback,auto_ack=True)

channel.start_consuming()      
import pika
# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104'))

# 有密碼
credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()
# 聲明一個隊列(建立一個隊列)
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello', # 消息隊列名稱
                      body='jjfdk')
connection.close()      

詳情參考:https://www.rabbitmq.com/tutorials/tutorial-two-python.html

 3、釋出/訂閱

RabbitMQ消息隊列

  在之前的RabbittMQ中主要用于将任務放入一個隊列鐘,然後消費者分别取出任務進行處理,而釋出訂閱是每一個消費者都将擁有自己的一個隊列,進而擷取消息。這樣每一個消費者都将擁有相同的消息。

  在上述模型中,“P”是生産者,也就是消息制造者,“X”是exchange ,用于将生産出來的消息給每一個隊列給一份,中間紅色的就是隊列,“C1”和“C2”是消息接受者。

釋出者:

import pika
credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='ex1',exchange_type='fanout')#fanout工作方式為ex1每一個隊列添加消息

channel.basic_publish(exchange='ex1',
                      routing_key='',
                      body='abcd')

connection.close()      

訂閱者:

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

# exchange='ex1',exchange的名稱
# exchange_type='fanout' , 工作方式将消息發送給所有的隊列
channel.exchange_declare(exchange='ex1',exchange_type='fanout')

# 随機生成一個隊列
result = channel.queue_declare(queue = '',exclusive = True)
queue_name = result.method.queue
# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='ex1',queue=queue_name)      

詳情參考:https://www.rabbitmq.com/tutorials/tutorial-three-python.html 

4、關鍵字釋出/訂閱

 上述釋出的工作方式是:

exchange_type='fanout'      

将消息發送給所有隊列,而選擇性的釋出/訂閱,需要使用exchange_type='direct'和routing_key=""進行關鍵字釋出訂閱。

RabbitMQ消息隊列

釋出者

import pika
credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='ex2',exchange_type='direct')

channel.basic_publish(exchange='ex2',
                      routing_key='gh',
                      body='nhgjod')

connection.close()      

exchange名稱為“ex2”,工作方式為“direct”,然後将exchange與關鍵字routing_key關鍵字進行綁定。

訂閱者1

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='ex2',exchange_type='direct')

# 随機生成一個隊列
result = channel.queue_declare(queue = '',exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='ex2',queue=queue_name,routing_key='bright')
channel.queue_bind(exchange='ex2',queue=queue_name,routing_key='gh')      

exchange與queue以及關鍵字進行綁定,routing_key=“gh”可以收到消息。

訂閱者2

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='ex2',exchange_type='direct')

# 随機生成一個隊列
result = channel.queue_declare(queue = '',exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='ex2',queue=queue_name,routing_key='bright')

def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body)

channel.basic_consume(queue_name,callback,auto_ack=True)

channel.start_consuming()      

routing_key=“bright”不能接收到關鍵字為“gh”的釋出者釋出的消息。

詳情參考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html

5、關鍵字模糊比對釋出

RabbitMQ消息隊列

可以看到使用type=topic,以及使用“#”以及“*”:

當隊列綁定“#”綁定routing_key時,比對任意字元。 
當特殊字元“*”綁定routing_key時,比對一個單詞。      

消息釋出:

import pika
credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='ex3',exchange_type='topic')

channel.basic_publish(exchange='ex3',
                      routing_key='bright.gh.km',
                      body='abcdefd')

connection.close()      

可以看到關鍵字routing_key='bright.gh.km',其餘的與之前釋出/訂閱并沒什麼差別。

消息訂閱1:

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()


channel.exchange_declare(exchange='ex3',exchange_type='topic')

# 随機生成一個隊列
result = channel.queue_declare(queue="",exclusive=True)
queue_name = result.method.queue
# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='ex3',queue=queue_name,routing_key='bright.*')


def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body)

channel.basic_consume(queue_name,callback,auto_ack=True)

channel.start_consuming()      

這時可以看到訂閱者1的routing_key='bright.*',可以比對釋出消息以bright開頭的後面跟一個單詞,是以釋出者的消息它接收不到。

訂閱者2:

import pika

credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()


channel.exchange_declare(exchange='ex3',exchange_type='topic')

# 随機生成一個隊列
result = channel.queue_declare(queue="",exclusive=True)
queue_name = result.method.queue
# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='ex3',queue=queue_name,routing_key='bright.#')


def callback(ch, method, properties, body):
    print("消費者接受到了任務: %r" % body)

channel.basic_consume(queue_name,callback,auto_ack=True)

channel.start_consuming()      

這時可以看到訂閱者1的routing_key='bright.#',可以比對釋出消息以bright開頭的後面多個單詞、字元,是以釋出者的消息它可以接收到,這也就說明“#”比“*”更強大。

詳情參考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

6、遠端過程調用(RPC)

RabbitMQ消息隊列

RPC工作方式:

  • 當用戶端啟動時,它會建立一個随機回調隊列。
  • 對于RPC請求,用戶端發送帶有兩個屬性的消息: reply_to,設定為回調隊列,correlation_id,設定為每個請求的唯一值。
  • 請求被發送到rpc_queue隊列。
  • Server正在等待rpc_queue上的請求。當出現請求時,它會執行任務并使用reply_to字段中的隊列将結果傳回給用戶端。
  • 用戶端等待回調隊列上的資料。出現消息時,它會檢查correlation_id屬性。如果它與請求中的值比對,則将響應傳回給應用程式。

用戶端:

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("admin", "123")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104', credentials=credentials))
        self.channel = self.connection.channel()

        # 随機生成一個消息隊列(用于接收結果)
        result = self.channel.queue_declare(queue="",exclusive=True)
        self.callback_queue = result.method.queue

        # 監聽消息隊列(傳回結果的隊列)中是否有值傳回,如果有值則執行 on_response 函數(一旦有結果,則執行on_response)
        self.channel.basic_consume(self.callback_queue,self.on_response, auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 發送一個任務包含的内容:  任務id = corr_id ;任務内容 = '10' ;用于接收結果的隊列名稱
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue', # 接收任務的隊列名稱
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue, # 用于接收結果的隊列
                                         correlation_id = self.corr_id, # 任務ID
                                         ),
                                   body=str(n))

        while self.response is None:
            self.connection.process_data_events()

        return self.response

fibonacci_rpc = FibonacciRpcClient()

response = fibonacci_rpc.call(10)
print('結果:',response)      

在用戶端需要做以下的事情:

  • 建立連接配接
  • 随機生成用于接收傳回結果的消息隊列
  • 監聽接收傳回結果的消息隊列,看是否有結果(是否執行on_response)
  • 定義主調用方法,執行rpc請求,包含請求的詳細資訊
  • 等待響應

服務端:

import pika
credentials = pika.PlainCredentials("admin","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
channel = connection.channel()

# 監聽任務隊列,是否有任務到來
channel.queue_declare(queue='rpc_queue')

def on_request(ch, method, props, body):
    n = int(body)
    response = n*100
    # props.reply_to  要放結果的隊列.
    # props.correlation_id  任務id
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue = 'rpc_queue',on_message_callback = on_request)
channel.start_consuming()      

在服務端需要做以下的事情:

  • 監聽請求隊列,看是否有請求到來。
  • 為basic_consume聲明了一個回調,它是RPC伺服器的核心。它在收到請求時執行。處理請求并發回響應。
  • 想要運作多個伺服器程序。為了在多個伺服器上平均配置設定負載,需要設定prefetch_count。

詳情參考:https://www.rabbitmq.com/tutorials/tutorial-six-python.html

作者:iveBoy

出處:http://www.cnblogs.com/shenjianping/

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須在文章頁面給出原文連接配接,否則保留追究法律責任的權利。

上一篇: docker基礎