天天看點

celery 消息隊列與定時任務有關celeryfiles structure:1, init.py2, celeryconfig.py3, obtain_data_task.py4, obtain_model_task.py5, execution

在進行定時擷取代理ip, 黑名單ip,以及進行ip校驗,相關資訊及特征擷取時,需要用到定時任務,常見的使用 linux 的crontab 來實作定時任務。但是為了在使用django,保持設計的一緻性,就采用 celery 消息隊列的定時任務來做。

開發環境:

centos 7

python3.5

Name: celery Version: 4.0.2

mongodb, django d都是最新的

有關celery

[直接摘自參考的部落格,懶得寫]

Celery 是一個強大的分布式任務隊列,它可以讓任務的執行完全脫離主程式,甚至可以被配置設定到其他主機上運作。我們通常使用它來實作異步任務( async task )和定時任務( crontab )。它的架構組成如下圖:

celery 消息隊列與定時任務有關celeryfiles structure:1, init.py2, celeryconfig.py3, obtain_data_task.py4, obtain_model_task.py5, execution

可以看到, Celery 主要包含以下幾個子產品:

任務子產品

包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發并發往任務隊列,而定時任務由 Celery Beat 程序周期性地将任務發往任務隊列。

消息中間件 Broker

Broker ,即為任務排程隊列,接收任務生産者發來的消息(即任務),将任務存入隊列。 Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。

任務執行單元 Worker

Worker 是執行任務的處理單元,它實時監控消息隊列,擷取隊列中排程的任務,并執行它。

任務結果存儲 Backend

Backend 用于存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, Redis 和 MongoDB 等。

異步任務

使用 Celery 實作異步任務主要包含三個步驟:

建立一個 Celery 執行個體

啟動 Celery Worker

應用程式調用異步任務

以下是自己寫的一個定時任務,整體結構沒問題。并能很好的運作。

檔案結構如下:

files structure:

celery_task
|- __init__.py
|- celeryconfig.py
|-obtain_data_task.py
|-obtain_model_task.py
           

從以上結構可以看出, celery_task 被配置為python的一個module

1, init.py

# coding:utf-8
from __future__ import absolute_import
from celery import Celery


app = Celery("service")
app.config_from_object("celery_task.celeryconfig")
           

2, celeryconfig.py

from __future__ import absolute_import

from datetime import timedelta

from celery.schedules import crontab

BROKER_URL = "mongodb://10.168.99.118:27017/celery_service"
# CELERY_RESULT_BACKEND = "mongodb://10.168.99.118:27017/celery/result"
# CELERY_RESULT_BACKEND = 'mongodb://10.168.99.118:27017/'
# CELERY_MONGODB_BACKEND_SETTINGS = {
#     'database': 'test',
#     'taskmeta_collection': 'result',
# }


CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_IMPORTS = (
    "celery_task.obtain_data_task",
    "celery_task.obtain_model_task"
)

CELERYBEAT_SCHEDULE = {
    "get_proxy_20_min": {
        "task": "celery_task.obtain_data_task.get_proxy",
        "schedule": timedelta(minutes=),
        # "schedule": crontab(minute="*/20"),
        "args": ()
    },
    "check_proxy_10_min": {
        "task": "celery_task.obtain_data_task.check_proxy",
        "schedule": timedelta(minutes=),
        # "schedule": crontab(minute="*/11"),
        "args": ()
    },
    "get_rbl_20_min": {
        "task": "celery_task.obtain_data_task.get_rbl",
        "schedule": timedelta(minutes=),
        # "schedule": crontab(minute="*/20"),
        "args": ()
    },
    "generate_train_set_4_day": {
        "task": "celery_task.obtain_model_task.generate_train_set",
        "schedule": timedelta(hours=),
        # "schedule": crontab(day_of_month="2-29/4"),
        "args": ()
    },
    "generate_model_day": {
        "task": "celery_task.obtain_model_task.generate_model",
        "schedule": timedelta(hours=),
        # "schedule": crontab(day_of_month="2-29/5"),
        "args": ()
    },

}
           

3, obtain_data_task.py

from __future__ import absolute_import
from celery_task import app
import os
import sys
cwd = os.path.dirname(__file__)
package_path = os.path.join(cwd, "../")
sys.path.append(package_path)

import api_extract
import ip181
import check_proxy
import raw_black_ip
import train_data

@app.task
def get_proxy():
    """
    get the raw proxy data
    then check proxy, obbtain the real useful proxy
    get the proxy items
    :return:
    """
    ae = api_extract.ApiExtraction()
    ae.extract()

    ei = ip181.ExtractIp()
    ei.extract_page()


@app.task
def check_proxy():
    """
    check proxy run in a single process
    :return:
    """
    # cp = check_proxy.CheckProxy()
    # cp.run()
    td = train_data.TrainData()
    td.get_proxy_items()


@app.task
def get_rbl():
    rbi = raw_black_ip.RawBlackIp()
    rbi.run()
    td = train_data.TrainData()
    td.get_rbl_items()

# normal ips process in a single process
           

4, obtain_model_task.py

from __future__ import absolute_import
import time
from celery_task import app
import os
import sys
cwd = os.path.dirname(__file__)
package_path = os.path.join(cwd, "../")
sys.path.append(package_path)

import train_data
import train_set
import random_forest


@app.task
def generate_train_set():
    """
    4 days
    :return:
    """
    ts = train_set.TrainSet()
    ts.generate_train_set()

@app.task
def generate_model():
    """
    5 days
    :return:
    """
    random_forest.train_rf()

           

5, execution

open shell in the direction out of celery_task, and input follow command , then press ENTER

reference docs

異步任務神器 Celery 簡明筆記

https://fast.v2ex.com/t/327228