天天看點

celery5 + 用rabbitmq作為結果儲存,而不用redis存儲結果

項目背景:

java調用python 部署的深度學習模型,java前端是用rabbitmq中的隊列send存儲客戶發送的識别請求,現在為了實作一步到位的效果,需要對rabbitmq中的指定隊列send消息進行監測,進而調用模型服務,進行消費,再将結果傳回到rabbitmq指定隊列receive,供java前端進行擷取結果。

方案如下:

一、基于celery與rabbitmq、redis

celery 進行rabbitmq隊列的send消息進行任務分發,模型進行消費,結果傳回存儲到redis或者rpc中

1、若用rpc://,則會根據線程id的建立交換機及隊列名稱,作為結果存儲,需要在使用之後,将該隊列、交換機删除,否則會導緻太多的隊列。

如圖所示:

celery5 + 用rabbitmq作為結果儲存,而不用redis存儲結果

但可設定任務執行結果儲存時限,進行自動删除,參數如下

# 任務執行結果的逾時時間
    result_expires=1 * 3 * 60,  # 24 * 60 * 60   # 小時、分、秒 
           

為了滿足指定交換機、指定隊列儲存結果,需要對源碼進行修改。

以下代碼均在celery/backends/rpc.py中。

exchange = exchange or conf.result_exchange  # 隊列名
        exchange_type = exchange_type or conf.result_exchange_type  # 配置檔案設定指定交換機名稱 為 "se_re",否則為''
        self.exchange = self._create_exchange(
            exchange, exchange_type, self.delivery_mode,
        )  # 建立交換機
       def _create_exchange(self, name, type='direct', delivery_mode=2):
        	# uses direct to queue routing (anon exchange).
        	# return self.Exchange(None)  這裡沒問題  # 此處是之前的,
        	return self.Exchange(name, type, delivery_mode=2)  # 這是改過的
           
@property
    def binding(self):
        SS = self.Queue(
            self.oid, self.exchange, self.oid,
            durable=False,
            auto_delete=True,
            expires=self.expires,
        )
        # 無效嘗試,還是原來的
        # SS = self.Queue(
        #     "receive", self.exchange, "receive",
        #     # durable=False,
        #     durable=True,
        #     # auto_delete=True,
        #     auto_delete=False,
        #     # expires=self.expires,
        # )  
        return SS
    @cached_property
    def oid(self):
        # cached here is the app thread OID: name of queue we receive results on.
        # 這裡緩存的是應用線程OID:我們接收結果的隊列名稱。
        dd = "receive"  # 無效嘗試
        dd = self.app.thread_oid  # 原來的代碼
        return dd
           

為了滿足java查詢rabbitmq的需求,作為指定交換機、指定隊列,做以下修改

def __init__(self, app, connection=None, exchange=None, exchange_type=None,
                 persistent=None, serializer=None, auto_delete=True, **kwargs):
        super().__init__(app, **kwargs)
        conf = self.app.conf
        self._connection = connection
        self._out_of_band = {}
        self.persistent = self.prepare_persistent(persistent)
        self.delivery_mode = 2 if self.persistent else 1
        exchange = exchange or conf.result_exchange    # 配置檔案設定指定交換機名稱 為 "se_re"
        exchange_type = exchange_type or conf.result_exchange_type # 配置檔案設定指定交換機類型 為 "direct"
        self.exchange = self._create_exchange(
            exchange, exchange_type, self.delivery_mode,
        )
        # 添加以下代碼,配置檔案設定指定隊列名字'receive',接收結果
         ***if "result_queue" in conf:  # 判斷該參數是否存在配置檔案中,不存在設定為None
            self.receive_name = conf.result_queue
        else:
            self.receive_name = None***
             
        self.serializer = serializer or conf.result_serializer
        self.auto_delete = auto_delete
        self.result_consumer = self.ResultConsumer(
            self, self.app, self.accept,
            self._pending_results, self._pending_messages,
        )
        if register_after_fork is not None:
            register_after_fork(self, _on_after_fork_cleanup_backend)
           

配置檔案設定:

celery5 + 用rabbitmq作為結果儲存,而不用redis存儲結果

還需修改結果傳回代碼:

def store_result(self, task_id, result, state,
                     traceback=None, request=None, **kwargs):
        """Send task return value and state."""
        routing_key, correlation_id = self.destination_for(task_id, request)
        if not routing_key:
            return
        with self.app.amqp.producer_pool.acquire(block=True) as producer:
            **if self.receive_name:
                producer.publish(
                    self._to_result(task_id, state, result, traceback, request),
                    exchange=self.exchange,
                    routing_key=**self.receive_name**,  # 指定的接收隊列
                    correlation_id=correlation_id,
                    serializer=self.serializer,
                    retry=True, retry_policy=self.retry_policy,
                    declare=self.on_reply_declare(task_id),
                    delivery_mode=self.delivery_mode,
                )
            else:
                producer.publish(
                    self._to_result(task_id, state, result, traceback, request),
                    exchange=self.exchange,
                    routing_key=**routing_key**,  # 線程建立的隊列,若直接修改為指定隊列,無效
                    correlation_id=correlation_id,
                    serializer=self.serializer,
                    retry=True, retry_policy=self.retry_policy,
                    declare=self.on_reply_declare(task_id),
                    delivery_mode=self.delivery_mode,
                )
        return result**
           
2、若是用redis存儲結果,還需要做定時任務進行查詢redis庫中的結果,再将結果取出,并修改置位符;
		定時任務用celery進行實作
           

二、基于rabbitmq、pika的消息隊列,以及結合多線程、程序相結合。