天天看點

最近分布式系統開發小結: Slave子產品Executors設計1. Slave總體設計2. Slave 詳細設計3. Slave子產品總結

更新一段我在linkedin上對這個項目的描述,目前項目已經開發完在使用了。本文并不是最新的設計。

背景

解決hdfs/hive/rdbms/ftp/mongodb等資料源之間的批量資料同步問題

特性

跨機房場景下的鍊路優化;多路輸入和輸出的任務模型;資料容錯和可持久化;任務失敗恢複

任務排程

把任務配置解析為實體執行計劃,master控制任務的排程和失敗恢複,基于mesos完成資源配置設定和任務排程。slave分布在各個資料中心,具體傳輸任務的調起做到鍊路優化選擇。高并發場景下,增加mesos slave節點來保證可擴充性(cpu和mem資源),master将中繼資料記錄在zk上,并通過争搶zk鎖實作互備。

資料傳輸

傳輸元件分為input、cache和output三種executor,各自程序内通過雙隊列優化傳輸速度。資料以bundle為機關傳輸,通常上百行為一個bundle且可壓縮,netty作網絡通信。input端異步備份一份資料在bookkeeper,cache使用beanstalkd做消息隊列,output端處理bundle成功或失敗,會有守護線程異步删除或更新beanstalkd内的message(類似storm topology裡的ack),executors會把bundle傳輸狀态更新在zookkeeper上,某一executor挂掉都可以在一台slave上重新調起并恢複任務繼續進行。input和output端的reader和writer是插件化的。

====================================  我是更新線 ====================================

之前在最近分布式系統開發小結裡,提到了一個在開發中的系統的大緻設計,本文是我負責部分的一個詳細設計。在閱讀本文前可以先浏覽下之前那篇文章,對于系統的功能和概況有個基本了解。

slave子產品主要需要實作不同的mesos executors,包括input, memorystorage和output三種executor。每個dpump任務會由scheduler manager經過邏輯執行計劃和實體執行計劃的拆分,從knowledge center擷取知識,最終将切分後的task配置設定給相應的slaves執行,并通過mesos master,配置設定資源并調起slave上的各自的executor。三種executors的執行邏輯圖如下。

最近分布式系統開發小結: Slave子產品Executors設計1. Slave總體設計2. Slave 詳細設計3. Slave子產品總結

資料通過bundle形式在三種executor之間的流通,每個bundle有唯一id、一個string[]、以及一個index。index用于标記每個bundle最後資料輸出的最新成功行,即我們容錯粒度控制在行級别。對input、cache、output作一個簡單介紹:

 input,也叫reader。每個task内隻有一個input executor,負責從資料源(hdfs、ftp、mysql、mongodb等)讀出資料,将資料經過切分、處理、壓縮後通過netty流式傳輸給memorystorage。

cache,也叫memorystorage。每個task内隻有一個cache executor,負責從input端接收bundle,将bundle存取往一個隊列内,當有output連接配接的時候,将bundle取出輸送給output

output,也叫writer。每個task可能有多個output executor,負責将資料最終輸出到資料目的源。output從cache端得到bundle的過程也是流式的。

整個task的流通都是流式的,且slave之間的網絡通信使用的是netty這個nio架構,傳輸過程中還涉及到bundle高效的正反序列化和壓縮、解壓縮。最重要的一點是input、cache、output三個部分各自都有容錯設計,其中input和output通過向zookeeper記錄和擷取bundle狀态保證處理bundle的不重不漏,而cache通過對隊列内消息内容的鈍化,保證自身已儲存的bundle不丢失,并能在新的cache executor起來後,可以繼續為output提供bundle輸出。

下面詳細介紹三種executor的設計,閱讀過程中請參考這張task程序圖。

最近分布式系統開發小結: Slave子產品Executors設計1. Slave總體設計2. Slave 詳細設計3. Slave子產品總結

每個input負責一次job(每個job對應多個tasks)内最小粒度的檔案塊讀取,比如可能是一個hdfs block,一張hive表的一個分區甚至是一張mysql表。

input内還分有writer、buffer(雙隊列)和reader。writer是一個單線程,從資料源擷取資料并切分好bundle,每個bundle有唯一id和定長的字元串數組,然後将bundle存入雙隊列的輸入頭,在雙隊列的讀出頭有若幹個reader線程搶占bundle,每個reader擷取到bundle後釋放鎖并做二次處理、壓縮,最終reader通過netty client将bundle包裝成一個傳輸格式,以二進制流的方式通過channel流向cache。

writer端切分bundle保證了從同一個資料源的同份檔案塊讀取資料生成bundle是有序的,每次netty往channel裡寫入一份bundle的時候,會通過companion線程異步更新此task下znode内的bitmap,該bitmap标記每個bundle在input端是否被傳輸。每次input啟動的時候,netty會讀取znode上的bitmap緩存在記憶體裡,發送bundle前根據id作一次校對。是以當input挂掉或重新開機時,可以保證發送給cache的bundle不重不漏。

cache本身是一個netty server,接收input和output多個netty client的連接配接,并對不同的channel做不同event處理。cache executor需要一個多狀态的消息隊列,這裡采用的是beanstalkd隊列,下圖為該beanstalkd内消息(job)的狀态變化圖。

最近分布式系統開發小結: Slave子產品Executors設計1. Slave總體設計2. Slave 詳細設計3. Slave子產品總結

 每次cache将新的bundle put進beanstalkd的時候需要選擇一個tube(管道),beanstalkd可以開啟多個獨立的tube,tube記憶體放jobs,每個job有自己唯一的job id,而job消息體就是我們的bundlebytes(bundle存入cache直接存的就是序列化後的byte[])。

每個job存入queue後是ready狀态,被reserve之後,就不能被用戶端再次擷取到,即cache每次會從每個tube裡按順序reserve一個job,并發送消息體給output(一個output對應一個tube),這個過程保證每個job被消費一次,且隻能被一個output消費。如果output端消費成功,則該job會被delete掉;如果該job消費失敗,則會被重新置為ready,重新置為ready可能是因為逾時(每個job被reserve的時候都有一個time-to-run時間設定)了,也可以是用戶端release掉該job。

這裡,對于tube内job的後續處理交給acker這個線程來做。acker的設計靈感來源于storm。storm topology内每個bolt對tuple的執行和處理最終都會給spout一個ack響應,而拓撲過程中整棵tuple樹的成功/失敗執行狀态會由acker守護程序進行跟蹤,以此來保證每個tuple被完全處理,而acker對tuple的跟蹤算法是storm的主要突破之一。

cache端的acker線程會監聽zookeeper上znode樹上各個節點的事件變化,進而掌握被output消費的所有bundle的最後狀态,對應地删除、釋放,或者更新queue裡的job。需要注意的是這裡還涉及到一個更新job的過程。前面提到bundle内維護了一個index,而output消費bundle的時候,如果是資料行寫了一半出現了異常或者挂掉了,我們需要記錄bundle内資料行的最新index并将此資訊也記錄在znode上。對于這種最壞情況,acker負責将該fail的job從queue裡delete掉,并更改job内bundle bytes内容,重置新的index,再把新的job put進queue裡。這是我們最不希望看到的情況,同時也是我們對bundle能做的最細粒度的容錯設計。

beanstalkd啟動之後可以打開binlog開關,binlog是beanstalkd容錯恢複的機制,将記憶體裡的消息隊列結構映射到硬碟上。對于cache的容錯設計,直覺的辦法在于将這份binlog存在nfs或hdfs上,來保證cache挂掉重新開機後,能擷取到之前儲存的bundle資料,繼續提供服務。

output在最終的bundle消費階段,會把資料導向新的資料源。每個output擷取的bundle來自于cache裡的一個tube,而每個bundle的執行情況也會由companion線程異步更新到zookeeper上。

 對于output來說,它隻需要關心從cache端擷取的每個bundle都照常處理就可以了,不需要關心這個bundle之前是否被消費過,被消費到哪裡。原因在于,cache端的job狀态的變更和job的更新可以由acker保障,而acker是從zk上得到這些job的狀态并對queue異步更新。如果acker挂了,隻要重新起一個線程擷取znode上最新的狀态就可以了。對于output來說,能傳過來的bundle,對應到queue裡就是ready狀态的job,這個job可能被消費過了,但是他的index也是以得到了更新,output端對于所有bundle的處理是一緻的,唯一需要關心的是output需要把bundle的資訊異步更新給zk,如果output挂了,重新起一個output接着從cache讀bundle就可以了。

slave子產品三種executor的設計,主要考慮的是各個executor挂掉之後,怎樣保證資料處理的不重複和不遺漏。我們依賴zookeeper的可靠性,記錄、更新、判斷bundle的狀态,做到input、cache、output各司其職,最到最小粒度的容錯。executor本身的失敗和重新開機則由mesos保障,mesos作為資源管理系統,由master監控slave上各個executor的執行狀況,通過回調,可以在合适的slave上再次啟動挂掉的executor程序,保證業務task的順利進行。

(全文完)