天天看點

[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 & Mingle[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 & Mingle

[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 & Mingle

文章目錄

  • [源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 & Mingle
    • 0x00 摘要
    • 0x01 邏輯時鐘
      • 1.1 來由
      • 1.2 什麼是邏輯時鐘
      • 1.3 為什麼需要邏輯時鐘
      • 1.4 Lamport 邏輯時鐘
    • 0x02 Lamport 時鐘 in Kombu
    • 0x03 使用 clock
      • 3.1 Kombu mailbox
      • 3.2 Celery 應用
      • 3.3 EventDispatcher
    • 0x04 Mingle
      • 4.1 定義
      • 4.2 Sync 過程
        • 4.2.1 發起同步
          • 4.2.1.1 revoked task
          • 4.2.1.2 inspect.hello
        • 4.2.2 其他worker 回複
        • 4.2.3 收到後同步
        • 4.2.4 如何使用 revoked
    • 0xFF 參考

0x00 摘要

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注于實時處理的異步任務隊列,同時也支援任務排程。本文介紹 Celery 的Lamport 邏輯時鐘 & Mingle。

本文為 Celery 最後一篇。接下來有幾篇獨立文章,然後會開一個新系列,敬請期待。

全部連接配接如下:

[源碼分析] 消息隊列 Kombu 之 mailbox

[源碼分析] 消息隊列 Kombu 之 Hub

[源碼分析] 消息隊列 Kombu 之 Consumer

[源碼分析] 消息隊列 Kombu 之 Producer

[源碼分析] 消息隊列 Kombu 之 啟動過程

[源碼解析] 消息隊列 Kombu 之 基本架構

[ 源碼解析] 并行分布式架構 Celery 之架構 (1)

[ 源碼解析] 并行分布式任務隊列 Celery 之架構 (2)

[ 源碼解析] 并行分布式架構 Celery 之 worker 啟動 (1)

[源碼解析] 并行分布式架構 Celery 之 worker 啟動 (2)

[ 源碼解析] 并行分布式任務隊列 Celery 之啟動 Consumer

[ 源碼解析] 并行分布式任務隊列 Celery 之 Task是什麼

[從源碼學設計]celery 之 發送Task & AMQP

[源碼解析] 并行分布式任務隊列 Celery 之 消費動态流程

[源碼解析] 并行分布式任務隊列 Celery 之 多程序模型

[源碼分析] 分布式任務隊列 Celery 多線程模型 之 子程序

[源碼分析]并行分布式任務隊列 Celery 之 子程序處理消息

[源碼分析] 并行分布式任務隊列 Celery 之 Timer & Heartbeat

[ 源碼解析] 并行分布式任務隊列 Celery 之 EventDispatcher & Event 元件

[源碼解析] 并行分布式任務隊列 Celery 之 負載均衡

[ 源碼解析] 并行分布式架構 Celery 之 容錯機制

[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 & Mingle

0x01 邏輯時鐘

1.1 來由

分布式系統解決了傳統單體架構的單點問題和性能容量問題,另一方面也帶來了很多的問題,其中一個問題就是多節點的時間同步問題:不同機器上的實體時鐘難以同步,導緻無法區分在分布式系統中多個節點的事件時序。

1978年 Lamport 提出了邏輯時鐘的概念,來解決分布式系統中區分事件發生的時序問題。

1.2 什麼是邏輯時鐘

邏輯時鐘是為了區分現實中的實體時鐘提出來的概念,一般情況下我們提到的時間都是指實體時間,但實際上很多應用中,隻要所有機器有相同的時間就夠了,這個時間不一定要跟實際時間相同。

更進一步,如果兩個節點之間不進行互動,那麼它們的時間甚至都不需要同步。是以問題的關鍵點在于節點間的互動要在事件的發生順序上達成一緻,而不是對于時間達成一緻。

綜上,邏輯時鐘指的是分布式系統中用于區分事件的發生順序的時間機制。

1.3 為什麼需要邏輯時鐘

時間是在現實生活中是很重要的概念,有了時間我們就能比較事情發生的先後順序。如果是單個計算機内執行的事務,由于它們共享一個計時器,是以能夠很容易通過時間戳來區分先後。同理在分布式系統中也通過時間戳的方式來區分先後行不行?

答案是NO,因為在分布式系統中的不同節點間保持它們的時鐘一緻是一件不容易的事情。因為每個節點的CPU都有自己的計時器,而不同計時器之間會産生時間偏移,最終導緻不同節點上面的時間不一緻。

那麼是否可以通過某種方式來同步不同節點的實體時鐘呢?答案是有的,NTP就是常用的時間同步算法,但是即使通過算法進行同步,總會有誤差,這種誤差在某些場景下(金融分布式事務)是不能接受的。

是以,Lamport提出邏輯時鐘就是為了解決分布式系統中的時序問題,即如何定義a在b之前發生。

當且僅當事件A是由事件B引起的時候,事件A和B之間才存在一個先後關系。兩個事件可以建立因果關系的前提是:兩個事件之間可以用等于或小于光速的速度傳遞資訊。 值得注意的是這裡的因果關系指的是時序關系,即時間的前後,并不是邏輯上的原因和結果。

在分布式系統中,網絡是不可靠的,是以我們去掉可以和速度的限制,得到**兩個事件可以建立因果(時序)關系的前提是:兩個事件之間是否發生過資訊傳遞。**在分布式系統中,程序間通信的手段(共享記憶體、消息發送等)都屬于資訊傳遞。

1.4 Lamport 邏輯時鐘

分布式系統中按是否存在節點互動可分為三類事件,一類發生于節點内部,二是發送事件,三是接收事件。

邏輯時鐘定義

  • Clock Condition:對于任意事件a, b:如果a -> b(->表示a先于b發生),那麼C(a) < C(b),反之不然,因為有可能是并發事件。
  • 如果a和b都是程序Pi裡的事件,并且a在b之前,那麼Ci(a) < Ci(b)。
  • 如果a是程序Pi裡關于某消息的發送事件,b是另一程序Pj裡關于該消息的接收事件,那麼Ci(a) < Cj(b)

Lamport 邏輯時鐘原理如下:

[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 &amp; Mingle[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 &amp; Mingle
  • 每個事件對應一個Lamport時間戳,初始值為0
  • 如果事件在節點内發生,時間戳加1
  • 如果事件屬于發送事件,時間戳加1并在消息中帶上該時間戳
  • 如果事件屬于接收事件,時間戳 = Max(本地時間戳,消息中的時間戳) + 1

假設有事件a、b,C(a)、C(b)分别表示事件a、b對應的Lamport時間戳,如果a發生在b之前(happened before),記作 a -> b,則有C(a) < C(b),例如圖中有 C1 -> B1,那麼 C(C1) < C(B1)。通過該定義,事件集中Lamport時間戳不等的事件可進行比較,我們獲得事件的偏序關系(partial order)。注意:如果C(a) < C(b),并不能說明a -> b,也就是說C(a) < C(b)是a -> b的必要不充分條件

如果C(a) = C(b),那a、b事件的順序又是怎樣的?值得注意的是當C(a) = C(b)的時候,它們肯定不是因果關系,是以它們之間的先後其實并不會影響結果,我們這裡隻需要給出一種确定的方式來定義它們之間的先後就能得到全序關系。注意:Lamport邏輯時鐘隻保證因果關系(偏序)的正确性,不保證絕對時序的正确性。

0x02 Lamport 時鐘 in Kombu

在 Kombu 中,就有 Lamport 時鐘 的實作。

具體定義如下,我們可以知道:

  • 當發送消息時候,使用 forward API 來增加時鐘;
  • 當收到消息時候,使用 adjust 來調整本地時鐘;
class LamportClock:
    """Lamport's logical clock.

    A Lamport logical clock is a monotonically incrementing software counter
    maintained in each process.  It follows some simple rules:

        * A process increments its counter before each event in that process;
        * When a process sends a message, it includes its counter value with
          the message;
        * On receiving a message, the receiver process sets its counter to be
          greater than the maximum of its own value and the received value
          before it considers the message received.

    Conceptually, this logical clock can be thought of as a clock that only
    has meaning in relation to messages moving between processes.  When a
    process receives a message, it resynchronizes its logical clock with
    the sender.

    *Usage*

    When sending a message use :meth:`forward` to increment the clock,
    when receiving a message use :meth:`adjust` to sync with
    the time stamp of the incoming message.

    """

    #: The clocks current value.
    value = 0

    def __init__(self, initial_value=0, Lock=Lock):
        self.value = initial_value
        self.mutex = Lock()

    def adjust(self, other):
        with self.mutex:
            value = self.value = max(self.value, other) + 1
            return value

    def forward(self):
        with self.mutex:
            self.value += 1
            return self.value

    def sort_heap(self, h):
        if h[0][0] == h[1][0]:
            same = []
            for PN in zip(h, islice(h, 1, None)):
                if PN[0][0] != PN[1][0]:
                    break  # Prev and Next's clocks differ
                same.append(PN[0])
            # return first item sorted by process id
            return sorted(same, key=lambda event: event[1])[0]
        # clock values unique, return first item
        return h[0]

    def __str__(self):
        return str(self.value)

    def __repr__(self):
        return f'<LamportClock: {self.value}>'
           

0x03 使用 clock

3.1 Kombu mailbox

比如在 Kombu mailbox 之中,發送時候就需要攜帶本地的clock。

producer.publish(
                    reply, exchange=exchange, routing_key=routing_key,
                    declare=[exchange], headers={
                        'ticket': ticket, 'clock': self.clock.forward(),
                    }, retry=True,
                    **opts
                )
           

在收到消息時,就相應調整本地時鐘

def _collect(self, ticket,
                 limit=None, timeout=1, callback=None,
                 channel=None, accept=None):

        adjust_clock = self.clock.adjust

        def on_message(body, message):
            header = message.headers.get
            adjust_clock(header('clock') or 0)
           

3.2 Celery 應用

Celery 應用本身就有一個 LamportClock 變量。

class Celery:
        self.clock = LamportClock()
           

3.3 EventDispatcher

在 EventDispatcher 發送 Event 時候,就會使用 LamportClock 的時鐘。

def publish(self, type, fields, producer,
                blind=False, Event=Event, **kwargs):
        clock = None if blind else self.clock.forward()
        event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
                      pid=self.pid, clock=clock, **fields)
        with self.mutex:
            return self._publish(event, producer,
                                 routing_key=type.replace('-', '.'), **kwargs)
           

0x04 Mingle

在 Celery 的介紹中,Mingle 主要用在啟動或者重新開機的時候,它會和其他的 worker 互動,進而進行同步。同步的資料有:

  • 其他 worker 的 clock
  • 其他 worker 已經處理掉的 tasks

同步 clock 比較好了解,但是為什麼要同步 其他worker已經處理完的 task 呢?因為這個場景是啟動或者重新開機。

如果我們在 Celery 之中設定一個節點為

task_acks_late=True

之後,那麼這個節點上正在執行的任務若是遇到斷電,運作中被結束等情況,這些任務會被重新分發到其他節點進行重試。

是以當某個節點重新開機期間,可能本來由本 worker 負責的 task 會已經被其他 worker 處理掉,為了避免重複處理,就需要同步一下。

4.1 定義

Mingle 定義如下:

class Mingle(bootsteps.StartStopStep):
    """Bootstep syncing state with neighbor workers.

    At startup, or upon consumer restart, this will:

    - Sync logical clocks.
    - Sync revoked tasks.

    """

    label = 'Mingle'
    requires = (Events,)
    compatible_transports = {'amqp', 'redis'}

    def start(self, c):
        self.sync(c)
           

4.2 Sync 過程

啟動即同步,代碼邏輯如下:

  • Mingle 向 每一個 Worker 發送 hello
  • 每個 Worker 都向 Mingle 回複自己的資訊(clock 和 tasks)
  • Mingle 更新自己的資訊

這需要注意的是:沒有回調函數,直接 send_hello 就傳回了其他 worker 的結果,這是用異步來模拟的一個同步過程。

而 在 send_hello傳回時候,因為這時候收到了所有 worker 的回複,也包括自己,是以需要把自己host對應的回複删除。

對應代碼如下:

def sync(self, c):
        replies = self.send_hello(c)
        if replies:
            [self.on_node_reply(c, nodename, reply)
             for nodename, reply in replies.items() if reply]
        else:
            info('mingle: all aone')
           

4.2.1 發起同步

首先,Mingle 會向 每一個 Worker 發送 hello。

def send_hello(self, c):
        inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
        our_revoked = c.controller.state.revoked
        replies = inspect.hello(c.hostname, our_revoked._data) or {}
        replies.pop(c.hostname, None)  # delete my own response
        return replies
           

此時相關變量如下:

c.controller.state = {module} <module 'celery.worker.state' >
    
c.controller.state.revoked = {LimitedSet: 0} <LimitedSet(0): maxlen=50000, expires=10800, minlen=0>
    
c.controller = {Worker} celery@DESKTOP-0GO3RPO
    
c = {Consumer}  
           
4.2.1.1 revoked task

我們可以看到,Mingle 會從

c.controller.state.revoked

之中擷取 内容,即 目前 worker 記錄的已被完成的 tasks。然後發送給其他 worker。

4.2.1.2 inspect.hello

這裡是使用了 celery.app.control.Control 的 inspect 功能進行廣播發送。

def _request(self, command, **kwargs):
        return self._prepare(self.app.control.broadcast(
            command,
            arguments=kwargs,
            destination=self.destination,
            callback=self.callback,
            connection=self.connection,
            limit=self.limit,
            timeout=self.timeout, reply=True,
            pattern=self.pattern, matcher=self.matcher,
        ))
           

4.2.2 其他worker 回複

celery.app.control.Control 之中,會使用 _prepare 來處理其他 worker 的傳回。

def _prepare(self, reply):
        if reply:
            by_node = flatten_reply(reply)
            if (self.destination and
                    not isinstance(self.destination, (list, tuple))):
                return by_node.get(self.destination)
            if self.pattern:
                pattern = self.pattern
                matcher = self.matcher
                return {node: reply for node, reply in by_node.items()
                        if match(node, pattern, matcher)}
            return by_node
           

4.2.3 收到後同步

在收到其他worker回複之後會進行同步,我們可以看到其同步了時鐘 和 tasks。

具體 task 的更新,是由 state 完成的。

def sync_with_node(self, c, clock=None, revoked=None, **kwargs):
        self.on_clock_event(c, clock)
        self.on_revoked_received(c, revoked)

    def on_clock_event(self, c, clock):
        c.app.clock.adjust(clock) if clock else c.app.clock.forward()

    def on_revoked_received(self, c, revoked):
        if revoked:
            c.controller.state.revoked.update(revoked)
           

4.2.4 如何使用 revoked

當釋出任務時候,如果發現該任務已經被設定為 revoked,則不會釋出該任務。

def default(task, app, consumer,
            info=logger.info, error=logger.error, task_reserved=task_reserved,
            to_system_tz=timezone.to_system, bytes=bytes,
            proto1_to_proto2=proto1_to_proto2):
    """Default task execution strategy.

    Note:
        Strategies are here as an optimization, so sadly
        it's not very easy to override.
    """
    .....

    revoked_tasks = consumer.controller.state.revoked

    def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):
        ......
        if (req.expires or req.id in revoked_tasks) and req.revoked():
            return
				...... 										
        if callbacks:
            [callback(req) for callback in callbacks]
        handle(req)
    return task_message_handler
           

0xFF 參考

分布式系統:Lamport 邏輯時鐘

6: Events 的實作

8: State 和 Result

★★★★★★關于生活和技術的思考★★★★★★

微信公衆賬号:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,可以掃描下面二維碼(或者長按識别二維碼)關注個人公衆号)。

[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 &amp; Mingle[源碼解析] 并行分布式架構 Celery 之 Lamport 邏輯時鐘 &amp; Mingle