天天看點

Twitter Storm中Bolt消息傳遞路徑之源碼解讀

bolt作為task被executor執行,而executor是一個個的線程,是以executor必須存在于具體的process之中,而這個process就是worker。至于worker是如何被supervisor建立,爾後worker又如何建立executor線程,這些暫且按下不表。

假設同屬于一個topology的spout與bolt分别處于不同的jvm,即不同的worker中,不同的jvm可能處于同一台實體機器,也可能處于不同的實體機器中。為了讓情景簡單,認為jvm處于不同的實體機器中。

spout的輸出消息到達bolt,作為bolt的輸入會經過這麼幾個階段。

1. spout的輸出通過該spout所處worker的消息輸出線程,将tuple輸入到bolt所屬的worker。它們之間的通路是socket連接配接,用zeromq實作。

2. bolt所處的worker有一個專門處理socket消息的receive thread 接收到spout發送來的tuple

3. receive thread将接收到的消息傳送給對應的bolt所在的executor。 在worker内部(即同一process内部),消息傳遞使用的是lmax disruptor pattern.

4. executor接收到tuple之後,由event-handler進行處理

下面是具體的源碼

1. worker建立消息接收線程 

worker.clj

(defn launch-receive-thread [worker]

  (log-message "launching receive-thread for " (:assignment-id worker) ":" (:port worker))

  (msg-loader/launch-receive-thread!

    (:mq-context worker)

    (:storm-id worker)

    (:port worker)

    (:transfer-local-fn worker)

    (-> worker :storm-conf (get topology-receiver-buffer-size))

    :kill-fn (fn [t] (halt-process! 11))))

注意加亮的行會将storm.yaml中配置使用zmq或其它

storm.messaging.transport:"backtype.storm.messaging.zmq"

2. worker從socket接收到新消息

vthread (async-loop

                 (fn []

                   (let [socket (.bind ^icontext context storm-id port)]

                     (fn []

                       (let [batched (arraylist.)

                             init (.recv ^iconnection socket 0)]

                         (loop [packet init]

                           (let [task (if packet (.task ^taskmessage packet))

                                 message (if packet (.message ^taskmessage packet))]

                             (if (= task -1)

                               (do (log-message "receiving-thread:[" storm-id ", " port "] received shutdown notice")

                                 (.close socket)

                                 nil )

                               (do

                                 (when packet (.add batched [task message]))

                                 (if (and packet (< (.size batched) max-buffer-size))

                                   (recur (.recv ^iconnection socket 1))

                                   (do (transfer-local-fn batched)

                                     0 ))))))))))

加亮行使用的transfer-local-fn會将接收的taskmessage傳遞給相應的executor

3. transfer-local-fn

(defn mk-transfer-local-fn [worker]

  (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)

        task->short-executor (:task->short-executor worker)

        task-getter (comp #(get task->short-executor %) fast-first)]

    (fn [tuple-batch]

      (let [grouped (fast-group-by task-getter tuple-batch)]

        (fast-map-iter [[short-executor pairs] grouped]

          (let [q (short-executor-receive-queue-map short-executor)]

            (if q

              (disruptor/publish q pairs)

              (log-warn "received invalid messages for unknown tasks. dropping... ")

              )))))))

用disruptor線上程之間進行消息傳遞。

多費一句話,mk-transfer-local-fn表示将外部世界的消息傳遞給本程序内的線程。而mk-transfer-fn則剛好在方向上反過來。

4. 消息被executor處理

executor.clj

==========================================================

(defn mk-task-receiver [executor-data tuple-action-fn]

  (let [^kryotupledeserializer deserializer (:deserializer executor-data)

        task-ids (:task-ids executor-data)

        debug? (= true (-> executor-data :storm-conf (get topology-debug)))

        ]

    (disruptor/clojure-handler

      (fn [tuple-batch sequence-id end-of-batch?]

        (fast-list-iter [[task-id msg] tuple-batch]

          (let [^tupleimpl tuple (if (instance? tuple msg) msg (.deserialize deserializer msg))]

            (when debug? (log-message "processing received message " tuple))

            (if task-id

              (tuple-action-fn task-id tuple)

              ;; null task ids are broadcast tuples

              (fast-list-iter [task-id task-ids]

                (tuple-action-fn task-id tuple)

                ))

            ))))))

加亮行中tuple-action-fn定義于mk-threads(源檔案executor.clj)中。因為目前以bolt為例,是以會調用的tuple-action-fn定義于defmethod mk-threads :bolt [executor-data task-datas]

那麼mk-task-receiver是如何與disruptor關聯起來的呢,可以見定義于mk-threads中的下述代碼

(let [receive-queue (:receive-queue executor-data)

              event-handler (mk-task-receiver executor-data tuple-action-fn)]

          (disruptor/consumer-started! receive-queue)

          (fn []            

            (disruptor/consume-batch-when-available receive-queue event-handler)

            0)))

到了這裡,消息的發送與接收處理路徑打通。