項目背景:
java調用python 部署的深度學習模型,java前端是用rabbitmq中的隊列send存儲客戶發送的識别請求,現在為了實作一步到位的效果,需要對rabbitmq中的指定隊列send消息進行監測,進而調用模型服務,進行消費,再将結果傳回到rabbitmq指定隊列receive,供java前端進行擷取結果。
方案如下:
一、基于celery與rabbitmq、redis
celery 進行rabbitmq隊列的send消息進行任務分發,模型進行消費,結果傳回存儲到redis或者rpc中
1、若用rpc://,則會根據線程id的建立交換機及隊列名稱,作為結果存儲,需要在使用之後,将該隊列、交換機删除,否則會導緻太多的隊列。
如圖所示:

但可設定任務執行結果儲存時限,進行自動删除,參數如下
# 任務執行結果的逾時時間
result_expires=1 * 3 * 60, # 24 * 60 * 60 # 小時、分、秒
為了滿足指定交換機、指定隊列儲存結果,需要對源碼進行修改。
以下代碼均在celery/backends/rpc.py中。
exchange = exchange or conf.result_exchange # 隊列名
exchange_type = exchange_type or conf.result_exchange_type # 配置檔案設定指定交換機名稱 為 "se_re",否則為''
self.exchange = self._create_exchange(
exchange, exchange_type, self.delivery_mode,
) # 建立交換機
def _create_exchange(self, name, type='direct', delivery_mode=2):
# uses direct to queue routing (anon exchange).
# return self.Exchange(None) 這裡沒問題 # 此處是之前的,
return self.Exchange(name, type, delivery_mode=2) # 這是改過的
@property
def binding(self):
SS = self.Queue(
self.oid, self.exchange, self.oid,
durable=False,
auto_delete=True,
expires=self.expires,
)
# 無效嘗試,還是原來的
# SS = self.Queue(
# "receive", self.exchange, "receive",
# # durable=False,
# durable=True,
# # auto_delete=True,
# auto_delete=False,
# # expires=self.expires,
# )
return SS
@cached_property
def oid(self):
# cached here is the app thread OID: name of queue we receive results on.
# 這裡緩存的是應用線程OID:我們接收結果的隊列名稱。
dd = "receive" # 無效嘗試
dd = self.app.thread_oid # 原來的代碼
return dd
為了滿足java查詢rabbitmq的需求,作為指定交換機、指定隊列,做以下修改
def __init__(self, app, connection=None, exchange=None, exchange_type=None,
persistent=None, serializer=None, auto_delete=True, **kwargs):
super().__init__(app, **kwargs)
conf = self.app.conf
self._connection = connection
self._out_of_band = {}
self.persistent = self.prepare_persistent(persistent)
self.delivery_mode = 2 if self.persistent else 1
exchange = exchange or conf.result_exchange # 配置檔案設定指定交換機名稱 為 "se_re"
exchange_type = exchange_type or conf.result_exchange_type # 配置檔案設定指定交換機類型 為 "direct"
self.exchange = self._create_exchange(
exchange, exchange_type, self.delivery_mode,
)
# 添加以下代碼,配置檔案設定指定隊列名字'receive',接收結果
***if "result_queue" in conf: # 判斷該參數是否存在配置檔案中,不存在設定為None
self.receive_name = conf.result_queue
else:
self.receive_name = None***
self.serializer = serializer or conf.result_serializer
self.auto_delete = auto_delete
self.result_consumer = self.ResultConsumer(
self, self.app, self.accept,
self._pending_results, self._pending_messages,
)
if register_after_fork is not None:
register_after_fork(self, _on_after_fork_cleanup_backend)
配置檔案設定:
還需修改結果傳回代碼:
def store_result(self, task_id, result, state,
traceback=None, request=None, **kwargs):
"""Send task return value and state."""
routing_key, correlation_id = self.destination_for(task_id, request)
if not routing_key:
return
with self.app.amqp.producer_pool.acquire(block=True) as producer:
**if self.receive_name:
producer.publish(
self._to_result(task_id, state, result, traceback, request),
exchange=self.exchange,
routing_key=**self.receive_name**, # 指定的接收隊列
correlation_id=correlation_id,
serializer=self.serializer,
retry=True, retry_policy=self.retry_policy,
declare=self.on_reply_declare(task_id),
delivery_mode=self.delivery_mode,
)
else:
producer.publish(
self._to_result(task_id, state, result, traceback, request),
exchange=self.exchange,
routing_key=**routing_key**, # 線程建立的隊列,若直接修改為指定隊列,無效
correlation_id=correlation_id,
serializer=self.serializer,
retry=True, retry_policy=self.retry_policy,
declare=self.on_reply_declare(task_id),
delivery_mode=self.delivery_mode,
)
return result**
2、若是用redis存儲結果,還需要做定時任務進行查詢redis庫中的結果,再将結果取出,并修改置位符;
定時任務用celery進行實作
二、基于rabbitmq、pika的消息隊列,以及結合多線程、程序相結合。