在進行定時擷取代理ip, 黑名單ip,以及進行ip校驗,相關資訊及特征擷取時,需要用到定時任務,常見的使用 linux 的crontab 來實作定時任務。但是為了在使用django,保持設計的一緻性,就采用 celery 消息隊列的定時任務來做。
開發環境:
centos 7
python3.5
Name: celery Version: 4.0.2
mongodb, django d都是最新的
有關celery
[直接摘自參考的部落格,懶得寫]
Celery 是一個強大的分布式任務隊列,它可以讓任務的執行完全脫離主程式,甚至可以被配置設定到其他主機上運作。我們通常使用它來實作異步任務( async task )和定時任務( crontab )。它的架構組成如下圖:
可以看到, Celery 主要包含以下幾個子產品:
任務子產品
包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發并發往任務隊列,而定時任務由 Celery Beat 程序周期性地将任務發往任務隊列。
消息中間件 Broker
Broker ,即為任務排程隊列,接收任務生産者發來的消息(即任務),将任務存入隊列。 Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。
任務執行單元 Worker
Worker 是執行任務的處理單元,它實時監控消息隊列,擷取隊列中排程的任務,并執行它。
任務結果存儲 Backend
Backend 用于存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, Redis 和 MongoDB 等。
異步任務
使用 Celery 實作異步任務主要包含三個步驟:
建立一個 Celery 執行個體
啟動 Celery Worker
應用程式調用異步任務
以下是自己寫的一個定時任務,整體結構沒問題。并能很好的運作。
檔案結構如下:
files structure:
celery_task
|- __init__.py
|- celeryconfig.py
|-obtain_data_task.py
|-obtain_model_task.py
從以上結構可以看出, celery_task 被配置為python的一個module
1, init.py
# coding:utf-8
from __future__ import absolute_import
from celery import Celery
app = Celery("service")
app.config_from_object("celery_task.celeryconfig")
2, celeryconfig.py
from __future__ import absolute_import
from datetime import timedelta
from celery.schedules import crontab
BROKER_URL = "mongodb://10.168.99.118:27017/celery_service"
# CELERY_RESULT_BACKEND = "mongodb://10.168.99.118:27017/celery/result"
# CELERY_RESULT_BACKEND = 'mongodb://10.168.99.118:27017/'
# CELERY_MONGODB_BACKEND_SETTINGS = {
# 'database': 'test',
# 'taskmeta_collection': 'result',
# }
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_IMPORTS = (
"celery_task.obtain_data_task",
"celery_task.obtain_model_task"
)
CELERYBEAT_SCHEDULE = {
"get_proxy_20_min": {
"task": "celery_task.obtain_data_task.get_proxy",
"schedule": timedelta(minutes=),
# "schedule": crontab(minute="*/20"),
"args": ()
},
"check_proxy_10_min": {
"task": "celery_task.obtain_data_task.check_proxy",
"schedule": timedelta(minutes=),
# "schedule": crontab(minute="*/11"),
"args": ()
},
"get_rbl_20_min": {
"task": "celery_task.obtain_data_task.get_rbl",
"schedule": timedelta(minutes=),
# "schedule": crontab(minute="*/20"),
"args": ()
},
"generate_train_set_4_day": {
"task": "celery_task.obtain_model_task.generate_train_set",
"schedule": timedelta(hours=),
# "schedule": crontab(day_of_month="2-29/4"),
"args": ()
},
"generate_model_day": {
"task": "celery_task.obtain_model_task.generate_model",
"schedule": timedelta(hours=),
# "schedule": crontab(day_of_month="2-29/5"),
"args": ()
},
}
3, obtain_data_task.py
from __future__ import absolute_import
from celery_task import app
import os
import sys
cwd = os.path.dirname(__file__)
package_path = os.path.join(cwd, "../")
sys.path.append(package_path)
import api_extract
import ip181
import check_proxy
import raw_black_ip
import train_data
@app.task
def get_proxy():
"""
get the raw proxy data
then check proxy, obbtain the real useful proxy
get the proxy items
:return:
"""
ae = api_extract.ApiExtraction()
ae.extract()
ei = ip181.ExtractIp()
ei.extract_page()
@app.task
def check_proxy():
"""
check proxy run in a single process
:return:
"""
# cp = check_proxy.CheckProxy()
# cp.run()
td = train_data.TrainData()
td.get_proxy_items()
@app.task
def get_rbl():
rbi = raw_black_ip.RawBlackIp()
rbi.run()
td = train_data.TrainData()
td.get_rbl_items()
# normal ips process in a single process
4, obtain_model_task.py
from __future__ import absolute_import
import time
from celery_task import app
import os
import sys
cwd = os.path.dirname(__file__)
package_path = os.path.join(cwd, "../")
sys.path.append(package_path)
import train_data
import train_set
import random_forest
@app.task
def generate_train_set():
"""
4 days
:return:
"""
ts = train_set.TrainSet()
ts.generate_train_set()
@app.task
def generate_model():
"""
5 days
:return:
"""
random_forest.train_rf()
5, execution
open shell in the direction out of celery_task, and input follow command , then press ENTER
reference docs
異步任務神器 Celery 簡明筆記
https://fast.v2ex.com/t/327228