天天看點

異步任務_celery的異步任務設計建議小建議與最佳實踐更多的優化建議避免建立同步的子任務

Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統,并且提供維護這樣一個系統的必需工具。它是一個專注于實時處理的任務隊列,同時也支援任務排程。

筆者在學習這個架構的時候,發現它的許多用法代表了分布式系統設計的一些有意義的政策和方法,選取了Celery User Guide的Task章節做如下翻譯。

異步任務_celery的異步任務設計建議小建議與最佳實踐更多的優化建議避免建立同步的子任務

小建議與最佳實踐

忽略掉你不需要的任務結果

如果你不關心任務的執行結果,請確定ignore_result

是關閉的狀态,因為存儲結果将耗費時間并且占用資源。

@app.task(ignore_result=True) def mytask():     something()
           

另外,任務結果的設定還可以由task_ignore_result來指定為全局設定。

更多的優化建議

你可以在官方文檔的Optimizing Guide看到更多的優化建議。

避免建立同步的子任務

讓一個任務等待另一個任務是十分低效的,如果并行工作池的資源耗盡了,還将造成任務死鎖。

建議采用異步任務的方式,比如可以使用回調函數來實作。

  • 不好的方式:
@app.task def update_page_info(url):     page = fetch_page.delay(url).get()     info = parse_page.delay(url, page).get()     store_page_info.delay(url, info) @app.task def fetch_page(url):     return myhttplib.get(url) @app.task def parse_page(url, page):     return myparser.parse_document(page) @app.task def store_page_info(url, info):     return PageInfo.objects.create(url, info)
           
  • 好的方式:
def update_page_info(url):     # fetch_page -> parse_page -> store_page     chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)     chain() @app.task() def fetch_page(url):     return myhttplib.get(url) @app.task() def parse_page(page):     return myparser.parse_document(page) @app.task(ignore_result=True) def store_page_info(info, url):     PageInfo.objects.create(url=url, info=info)
           

這裡我将不同的任務通過簽名signature()連接配接起來,建立一個任務鍊。在Canvas: Designing Work-flows你可以看到關于任務鍊和其他強大的結構的使用方法。

預設的,celery不允許你運作同步任務,但在罕見的極端情況下你可能必須這麼做。【警告】不建議将子任務做成同步設計。

@app.task def update_page_info(url):     page = fetch_page.delay(url).get(disable_sync_subtasks=False)     info = parse_page.delay(url, page).get(disable_sync_subtasks=False)     store_page_info.delay(url, info) @app.task def fetch_page(url):     return myhttplib.get(url) @app.task def parse_page(url, page):     return myparser.parse_document(page) @app.task def store_page_info(url, info):     return PageInfo.objects.create(url, info)
           

性能與政策

顆粒度

任務顆粒度是指每個子任務所需的計算量。總體上說,把大的問題拆分成很多個小的任務,比很少的幾個長任務要好一點。因為采用小任務的方式,你可以并行執行很多小任務,并且這些小任務不會因為執行時間太長而阻塞了執行程序,使它無法運作其他正在等待的任務。

但是,執行任務是有開銷的,比如需要發送一個消息、資料不在本地(譯者注:需要到遠端請求資料或将資料存儲到遠端)等。是以如果任務粒度被拆分的過細,這些開銷很可能将使你得不償失。

參考

Art of Concurrency 這本書有一個章節介紹了關于任務開銷的問題[AOC1]。

資料位置

任務執行程序離資料越近越好。最好是在本地的記憶體裡,最差的情況,可别是從另一個大陸傳輸過來啊。如果資料離你很遠,你可以考慮在那個地方跑另一個任務執行程序。如果這都做不到,那就把常用的資料緩存起來,或者預先把一些常用資料讀過來。

在任務執行器之間共享資料的最簡單的辦法,就是使用一個分布式緩存系統,例如memcached。

參考

由Jim Gray寫的Distributed Computing Economics這篇文章是介紹資料位置這個課題的很棒的介紹。

狀态

由于celery是一個分布式系統,你不知道任務在哪個程序、或者在什麼機器上執行,你甚至不知道任務是否将及時被運作。

古老的異步名言告訴我們,“要靠每個任務來負責維護整個世界”,意思是說,在某個任務要執行的時候,這個世界可能看起來已經變化了,是以任務端要肩負起保證這個世界是他應有的樣子的責任。例如,如果你有一個任務需要對一個搜尋引擎進行重新索引,并且這個重新索引的過程需要最多5分鐘來執行,那麼必須由任務端來負責這個事情,而不是調用端。

假設以下的場景,你有一片文章,并且有一個任務是自動添加縮略語的擴寫:

class Article(models.Model):     title = models.CharField()     body = models.TextField() @app.task def expand_abbreviations(article):     article.body.replace('MyCorp', 'My Corporation')     article.save()
           

首先,有一個作者建立了一篇文章并且儲存了它,然後他點選按鈕來啟動這個自動添加縮略語擴寫的任務。

>>> article = Article.objects.get(id=102)>>> expand_abbreviations.delay(article)
           

剛好,這個隊列十分繁忙,這個任務在2分鐘内不會被執行到。這個時候,另一個作者修改了這篇文章,但這個任務仍然帶着舊的文章主體作為參數,是以當最後這個任務被運作的時候,文章就被覆寫為舊版本了。

解決這個争搶問題十分簡單,隻需要使用文章id代替文章主體或文章對象作為任務參數,并且在執行任務之前對文章重新讀取就可以了。

@app.task def expand_abbreviations(article_id):     article = Article.objects.get(id=article_id)     article.body.replace('MyCorp', 'My Corporation')     article.save()
           
>>> expand_abbreviations.delay(article_id)
           

另外,因為發送大體積的消息是十分昂貴的,采用以上的方法還可以有額外的性能提升。

資料庫事務

來看看另一個例子:

from django.db import [email protected]_on_successdef create_article(request):     article = Article.objects.create()     expand_abbreviations.delay(article.pk)
           

這是一個Django視圖,它在資料庫建立了一個文章對象,然後将主鍵傳給任務去執行。這個任務定義時使用了commit_on_success這個修飾器,作用是,當傳回視圖的時候将執行事務送出,或者視圖産生異常的時候将執行事務復原。

如果在事務完成送出之前,任務啟動了,那麼此時将造成争搶的情況。而這個時候資料庫對象還沒有被建立出來。

解決的方案是,在所有的事務被成功送出之後,使用on_commit回調函數來建立你的celery任務。

from django.db.transaction import on_commitdef create_article(request):     article = Article.objects.create()     on_commit(lambda: expand_abbreviations.delay(article.pk))
           

示例

讓我們找一個真實世界的例子:某個部落格的評論發表時,需要經過垃圾過濾,當評論建立的時候,垃圾過濾器将在背景運作,執行過濾操作,這樣使用者就不用幹等着評論過濾執行完成了。

我有一個基于Django的部落格應用,它允許使用者給部落格文章發标評論。我将介紹一下這個部落格的模型/視圖和任務子產品。

  • blog/models.py

    評論子產品看起來像這樣:

from django.db import modelsfrom django.utils.translation import ugettext_lazy as _class Comment(models.Model):     name = models.CharField(_('name'), max_length=64)     email_address = models.EmailField(_('email address'))     homepage = models.URLField(_('home page'),                                blank=True, verify_exists=False)     comment = models.TextField(_('comment'))     pub_date = models.DateTimeField(_('Published date'),                                     editable=False, auto_add_now=True)     is_spam = models.BooleanField(_('spam?'),                                   default=False, editable=False)    class Meta:         verbose_name = _('comment')         verbose_name_plural = _('comments')
           

在視圖部分,當評論發表的時候,我先将評論寫進資料庫,然後在啟動垃圾過濾任務。

  • blog/views.py
from django import forms from django.http import HttpResponseRedirect from django.template.context import RequestContext from django.shortcuts import get_object_or_404, render_to_response from blog import tasks from blog.models import Commentclass CommentForm(forms.ModelForm):     class Meta:         model = Commentdef add_comment(request, slug, template_name='comments/create.html'):     post = get_object_or_404(Entry, slug=slug)     remote_addr = request.META.get('REMOTE_ADDR')     if request.method == 'post':         form = CommentForm(request.POST, request.FILES)         if form.is_valid():             comment = form.save()             # Check spam asynchronously.                          tasks.spam_filter.delay(comment_id=comment.id, remote_addr=remote_addr)             return HttpResponseRedirect(post.get_absolute_url())     else:         form = CommentForm()     context = RequestContext(request, {'form': form})     return render_to_response(template_name, context_instance=context)
           

為了過濾垃圾評論,我用上了Akismet,這個服務以往是用在開源部落格wordpress上的。Akismet對個人使用者免費,但商用時需要付費。你必須注冊到他們的服務來擷取API key。為了調用這個Akismet,我使用了Michael Foord寫的akismet.py庫。

  • blog/tasks.py
from celery import Celery from akismet import Akismet from django.core.exceptions import ImproperlyConfigured from django.contrib.sites.models import Site from blog.models import Comment app = Celery(broker='amqp://') @app.task def spam_filter(comment_id, remote_addr=None):     logger = spam_filter.get_logger()     logger.info('Running spam filter for comment %s', comment_id)     comment = Comment.objects.get(pk=comment_id)     current_domain = Site.objects.get_current().domain     akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))     if not akismet.verify_key():         raise ImproperlyConfigured('Invalid AKISMET_KEY')     is_spam = akismet.comment_check(user_ip=remote_addr, comment_content=comment.comment, comment_author=comment.name, comment_author_email=comment.email_address)     if is_spam:         comment.is_spam = True         comment.save()
           

繼續閱讀