天天看點

分布式實時處理系統在高性能計算場景下的應用

本文根據dbaplus社群第82期線上分享整理而成

講師介紹  

分布式實時處理系統在高性能計算場景下的應用

盧譽聲

autodesk資深系統研發工程師

《分布式實時處理系統:原理、架構與實作》作者,hurricane實時處理系統主要貢獻者,多部c++領域譯作。

大家好,我們今天主要讨論以下幾個問題:

機器學習與實時處理系統應用

分布式計算拓撲搭建

消息算法調優

hurricane計算架構與未來展望

一、機器學習與實時處理系統應用

現在我們先來看看第一部分:機器學習與實時處理系統應用。我們首先簡單了解下機器學習,然後引入分布式實時處理系統的概念以及實時處理系統與機器學的關系。

機器學習在現實世界中的作用越來越重要。

機器學習的方法非常多,比如傳統的知識庫方法,類比方法,歸納方法,演繹方法等各種方法。

目前在大多數領域中應用最多的當屬歸納學習方法。

在通常的歸納型機器學習中,我們的目标是讓計算機學習到一個“模型”(這種模型是人類預先組織好的,有固定的資料結構和算法等等),然後我們就可以用這個“模型”來進行“預測”。 預測就是從現實中輸入一些資料,通過學習到的模型進行計算,得到的輸出。我們希望這個模型可以在很高的機率下輸出一個和真實結果差距不大的結果。

一旦我們得到了這個模型,我們可以使用該模型處理輸入資料,得到輸出資料(即預測結果),而歸納性機器學習的任務就是學習中間的這個模型。

如果我們将這個模型看成一個函數,那麼我們可以認為歸納性機器學習的目的就是學習得到一個函數f,如果該函數的參數為x,輸出為y。那麼我們希望學到的東西就是 y = f(x) 中的f。

我們先用一個最簡單的例子來講一下:

假設我們現在不知道一個物體自由落體速度的計算公式,需要學習如何預測一個物體的自由落體速度 ,機器學習的第一步就是收集資料 。

假設我們可以測量出物體下墜的任何時間點的速度,那麼我們需要收集的資料就是某個物體的下墜時間和那個時間點的速度 。

現在我們收集到一系列資料:

時間    物體速度

1       9.7

2       20.0

3       29.0

4       39.9

5       49.4

6       58.5

7       69.0

8       78.8

9       89.0

我們這裡給出兩個假設。第一個假設是,一個物體自由落體的速度隻和時間有關系 第二個假設是,我們可以使用一個簡單的“模型”:一進制一次函數得到物體的速度。(即 f(x) = ax + b)

在這個問題中,a、b 這就是這個模型待學習的“參數”。

現在的問題就是——我們需要用什麼政策來學習這些參數?因為我們可以周遊的數值空間是無窮大的,是以我們必須采用某種政策指導我們進行學習。

我們就用非常樸素的思想來将解決這個問題吧。

在正式學習前,我們先将收集的資料分成兩組:一組是“訓練資料”,一組是“測試資料”。

假設訓練資料是:

2       20.0

測試資料是:

我們需要根據訓練資料計算出我們的參數a和b。然後使用我們計算出來的a和b預測測試資料,比較f(x)和實際資料的差距 。

如果誤差小到一定程度,說明我們學習到的參數是正确的。

比如和實際資料的差距都小于5% 。

如果滿足條件說明參數正确,否則說明參數不夠精确,需要進一步學習,這個差距,我們稱之為誤差(loss) 。

現在我們來看一下在這個模型(簡單的一進制一次線性函數)下如何學習這兩個參數:

比如我們可以采用這種學習政策 1.首先a和b都假定為整數,假定a的範圍是[-10, 10]這個區間,b的範圍是[-100, 100]這個區間 2.周遊所有的a和b的組合,使用a和b計算ax + b,x取每個訓練資料的輸入資料,評估計算結果精确性的方法是計算結果和訓練資料結果的差的絕對值除以訓練資料結果,也就是 loss = |f(x) - y| / y 3.計算每個組合的loss的平均值,取平均loss最小的為我們假定的“學習結果” 。

現在我們就得到了a和b,并且這個a和b是在我們給定範圍裡精度最高的參數,我們用這個a和b去訓練資料裡面計算平均的 |f(x) - y| / y 。如果平均loss小于 5%,說明這個a和b是符合我們精度的, 否則我們需要優化我們的學習政策。

這種樸素的基于歸納學習的機器學習方法可以分為以下幾步:

預先定義一個模型

根據模型制定學習政策

使用學習政策使用模型來學習(拟合)訓練資料,得到該模型中的所有參數

使用測試資料評估模型是否精确。如果不夠精确則根據學習政策繼續學習。如果足夠精确,我們就認為機器學習結束了。

最後我們可以得到模型和參數,這就是我們學到的結果,也就是那個用來預測的函數。

 這裡我們也要注意,上述步驟的前提是我們的模型是可以收斂的,如果模型本身就是發散的,那麼我們就永遠得不到我們的結果了。

機器學習與實時處理系統

傳統的機器學習是一種批處理式的方法,在這種方法下,我們需要預先準備好所有的訓練資料,對訓練資料進行精心組織和篩選,很多情況下還需要對資料進行标記(監督式學習),而訓練資料的組織會對最後的訓練結果産生相當大的影響。

在這種算法中我們要處理完所有資料後才能更新權重和模型。

但現在出現了許多線上學習算法,這種算法可以對實時輸入的資料進行計算,馬上完成權重和模型更新。

一方面我們可以用于監督式學習(完成資料标記後馬上加入訓練),也可以用于大量資料的非監督式學習。

而在這種情況下,實時處理系統就可以大展身手了。線上系統和實時處理系統可以確定實時完成對資料的學習,利用實時新系統。

實作思路如下圖所示:

分布式實時處理系統在高性能計算場景下的應用

這裡我們可以看到,系統接收來自其他系統的實時輸入,然後實時處理系統中使用線上算法快速處理資料,實時地更新模型權重資訊。

純粹的線上算法可能并不适合許多情景,但是如果将部分線上算法和傳統的批處理式算法結合,将會起到非常好的效果。而且許多資料分析工作确實可以通過這種方式完成一部分處理,至少是預處理。

目前機器學習的趨勢就是對精度和速度的要求越來越高,方法越來越複雜,而資料越來越多,計算量越來越大,如果沒有足夠的計算結果,不一定能夠在有限時間内完成足夠的學習,是以現在類似于tensorflow之類的機器學習解決方案都會提供針對分布式的支援。而大資料場景下的機器學習也變得越來越重要,這也對我們的分布式計算與存儲方案提出了嚴峻的挑戰。

二、分布式計算拓撲搭建

現在我們來看一個現實工程中常常會遇到的問題。

我們在開發實際系統時常常會收集大量的使用者體驗資訊,而我們常常需要對這些體驗資訊進行篩選、處理和分析。那麼我們應該如何搭建一個用于實時處理體驗資訊的分布式系統呢?

我們先來看一下整體流程:

分布式實時處理系統在高性能計算場景下的應用

收集體驗資訊

業務系統調用體驗資訊接口,将體驗資訊資訊異步寫入到特定的檔案當中。使用永不停息的體驗資訊檢測程式不斷将新生成的體驗資訊發送到資料處理伺服器。

處理體驗資訊

首先資料處理伺服器的體驗資訊接收負責将體驗資訊寫入本地的redis資料庫中。然後我們使用消息源從redis中讀取資料,再将資料發送到之後的消息處理單元,由不同的資料處理單元對體驗資訊進行不同處理。

存儲結果

消息處理單元完成體驗資訊處理之後,将體驗資訊處理結果寫入到cassandra資料庫中,并将體驗資訊資料寫入到elasticsearch資料庫中。

其中關鍵的部分就是圖中用長方形框出來的部分,該部分的作用是完成對資料的篩選、處理和基本分析。這部分我們将其稱作計算拓撲,也就是用于完成實際計算的部分。

我們接下來闡述一下每一步具體如何做。

分布式實時處理系統在高性能計算場景下的應用

收集體驗資訊分為以下幾步:

程式通過體驗資訊接口将體驗資訊寫入體驗資訊檔案中。我們假設程式會使用非阻塞的異步寫入接口,體驗資訊接口的調用方隻是将體驗資訊送入某個隊列中,然後繼續向下執行。

接着體驗資訊寫入線程從消息隊列中讀取資料,并将體驗資訊資料寫入到真正的體驗資訊檔案中。

寫入後,某一個體驗資訊代理程式會不斷監視體驗資訊檔案的改動,并将使用者新寫入的體驗資訊資訊發送到體驗資訊處理伺服器的體驗資訊收集服務接口上。

體驗資訊收集服務接口是整個服務的對外接口,負責将其他節點發送的體驗資訊資訊送入叢集内部的redis節點,并将體驗資訊資料寫入到redis的清單中。至此為止,體驗資訊收集過程就完成了。

分布式實時處理系統在高性能計算場景下的應用

接下來是處理體驗資訊,處理體驗資訊主要在計算拓撲中完成。分為四步:

體驗資訊處理消息源:負責監視redis清單的改變,從redis清單中讀取體驗資訊規則,并将體驗資訊規則文本轉換成計算拓撲的内部資料格式,傳送到下一個體驗資訊處理單元。

體驗資訊規則引擎:使用體驗資訊規則引擎對體驗資訊進行處理和過濾。這一步是可選的,也就是使用者可以加入自己的消息處理單元對收集的體驗資訊進行處理。這将會影響到發送到後續的消息處理單元(索引器和計數器)中的體驗資訊消息。這一步我們就不做處理了,如果讀者感興趣可以自己加入一個或者多個消息處理單元對體驗資訊進行處理。

索引:這一步必不可少,用于将體驗資訊規則引擎輸出的體驗資訊寫入到elasticsearch中,并便于使用者日後檢索這些體驗資訊。這裡涉及到一步——将體驗資訊規則元組轉換成json,并将json寫入elasticsearch。

統計:這一步也非常重要,用于對體驗資訊進行計數,這一步會将體驗資訊計數結果寫入cassandra的對應表中。便于使用者擷取統計資訊。

分布式實時處理系統在高性能計算場景下的應用

最後就是對計算結果的存儲,我們需要使用存儲子產品将資料寫入到不同的資料庫中:

elasticsearch:該資料庫用于存儲被轉換成json的原始體驗資訊資訊。使用者可以在elasticserach中檢索體驗資訊。

cassandra:該資料庫用于存儲體驗資訊的統計計算結果。因為cassandra支援原子計數列,是以可以非常勝任這個工作。

我們可以發現,在上面幾步中,其他都可以使用現成的系統來完成任務,最關鍵的部分就是計算拓撲,計算拓撲需要高實時性地完成體驗資訊處理分析任務,這樣才能應付大型系統中以極快速度産生的大量體驗資訊。

這裡我們可以使用一個獨立的計算叢集來完成這個事情。每個計算節點負責完成一個計算任務,完成之後将資料傳送給下一個計算節點完成後續的計算任務。每個計算節點都有一個消息隊列用于接收來自上一級的消息,然後處理消息并繼續将結果發送給下一級的計算節點。

這裡我們通常關心三個問題:

如何確定所有資料都得到了處理。

如何組織消息(資料)的傳遞,為整個叢集高效計算提供一個良好的i/o支援。

如何搭建這個計算拓撲并盡量高效地進行完成計算。

三、消息算法調優

1、如何確定所有資料都得到了處理

我們先來看一下如何解決解決資料的完全處理問題。

我們這裡講每一個需要處理的資料(一條體驗資訊記錄)組織成一個tuple,也就是元組。每個計算節點都以tuple為機關進行資料處理。每個元組都會有一個ack方法,用于告知上一級計算節點該tuple已經處理完成。

我們以下面的方式處理tuple,保證所有資料都會被完全處理:

首先給每個tuple一個id(僞随機的64位id)。

由消息源發出的tuple會有一個acker,構造tuple的時候會把新的tuple加入這個acker(就是包含這個acker)。

每個節點處理完一個元組調用元組的ack方法,改變acker内部的記錄值,表示目前tuple已經完成處理。

如果某個acker中的所有tuple都已經處理完成,那麼這個spout tuple就已經處理完成。表明該消息源發出的tuple被完全處理。

由于我們無法在acker中記錄下tuple樹,是以比較好的方式是實作一個基于異或的優化算法,該算法在storm中得到了應用。其具體實作是:在acker中設定一個ack id,每建立一個tuple,将id與其異或,每ack一個tuple時,将其與id做異或運算。這樣當所有tuple處理完成後,ack id為0,就可以知道所有元組處理完成。

如果消息源檢測到某個其發出的tuple沒有在特定時間内得到處理,就會重發該元組。後續的計算節點重新開始處理。為了實作一個同時符合cap的分布式系統,我們這裡後續的計算節點并不會緩存計算結果,而是會重新開始計算上一級節點重發的元組,具體為什麼這樣做請參見how to beat the cap theorem。

2、資料流量控制問題

我們可以設想一下,如果網絡狀況不好,在特定時間内有許多元組都沒有得到處理,那麼資料源節點就會重發許多tuple,然後後續節點繼續進行處理,産生更多的tuple,加上我們需要正常處理的tuple,使得叢集中的tuple越來越多。而由于網絡狀況不好,節點計算速度優先,會導緻叢集中積累的過多資料拖慢整個叢集的計算速度,進一步導緻更多的tuple可能計算失敗。

為了解決這個問題,我們必須想方設法控制叢集中的流量。

這個時候我們就會采用一種流量背壓機制。該機制借鑒自heron。

這個機制的思想其實很簡單,當每個計算節點處理 tuple過慢,導緻消息隊列中擠壓的tuple過多時會向其他節點發送消息,那麼所有向該節點發送消息的節點都會降低其發送消息的速度。經過逐級傳播慢慢将整個叢集的流量控制在比較合理的情況下。隻不過這個算法具體如何實作有待我們繼續研究。

3、如何搭建這個計算拓撲,盡量高效地進行完成計算

最後就是如何搭建這個拓撲,并盡量高效地完成計算了。

在分布式實時處理系統領域,目前最為成功的例子就是apache storm項目,而apache storm采用的就是一種流模型。而我們的hurricane則借鑒了storm的結構,并進行了簡化(主要在任務和線程模型上)。

這種流模型包括以下幾個概念:

拓撲結構:一個拓撲結構代表一個打包好的實時應用程式,相當于hadoop中的一個mapreduce任務。但是和mapreduce最大的不同就是,mapreduce最後會停止,相當于任務處理結束,而拓撲結構則會持續執行,永不停息,除非你手動停止。是以任何時刻流入的資料流都會被拓撲結構迅速處理。

流:一個流是拓撲結構中由元組組成的無限的序列,通常是由一個元組經過不同的處理單元處理之後産生的。每一個流入拓撲結構中的資料都會産生一個流。

元組:元組是在流中傳輸的資料,資料源會将輸入的資料轉成元組輸入到拓撲結構中,而資料處理單元會處理上一級的元組并産生新的元組傳給下一級的資料處理單元。元組中支援存儲不同類型的資料。

消息源:消息源是拓撲結構中資料流的源頭。通常其任務是讀取外部資料源輸入,并産生元組輸入拓撲結構中。可靠的資料源可以確定消息完全得到處理,并在合适的時候重發元組。

資料處理單元:資料處理單元是拓撲結構中負責處理資料的部分,你可以在其中篩選資料,統計資料,拼接資料等等。

資料處理單元會接收來自上一級的元組,并經過處理得到下一級的元組。每個資料處理單元會向上一級确認其元組有沒有得到正确處理,如果資料源發現固定時間内并不是全部元組都被處理完了,就會重發元組。

為了支撐這套模型,我們設計了hurricane的架構,該架構如下圖所示:

分布式實時處理系統在高性能計算場景下的應用

其中有以下幾個元件:

president:該元件是一個服務,是整個叢集的核心,負責完成整個叢集的排程和管理。當你需要啟動一個任務時,該節點會讀取整個進去的資訊,并将任務合理配置設定給各個計算節點。

mananger:每個計算節點都有一個mananger服務。該服務負責接收來自president的消息,并将任務交給具體的executor進行處理。當處理完成一個tuple後會将tuple發給下一個manager進行處理(發送給那個manager會在計算任務啟動時由president指定)。

executor:每個executor是一個線程,該線程會啟動一個消息循環,接收來自manager的消息,每接收到一個消息,就會調用executor内的task完成處理。每個executor會包含一個task,也就是一個計算處理任務。

task:計算處理任務,可能是産生tuple的消息源,也可以能是對tuple進行處理的消息處理單元,每個executor都會包含一個task。而storm中,一個executor中會包含多個task,我們模仿jstorm改造了這個模型,主要可以簡化task的管理和任務排程,而在jstorm中也證明這樣并不會降低叢集的處理能力。

四、展望

目前我們已經基本實作了這個架構并且能保證處理簡單的計算任務。我們需要在之後的時間中繼續完善這個架構和機制,完善并優化我們的系統實作,比如完全實作高層抽象squared和保序機制等,讓我們的系統能更接近切合實際的工程應用,而不是一個設想的空中樓閣。

除此以外,由于現在有許多計算任務需要使用基于向量和矩陣的浮點計算,是以我們計劃開發一個hurricane的子項目——sewedblas。這是一個blas庫的高層抽象,我們希望整合大量的blas庫,比如使用cpu的mkl/openblas,使用gpu的cuda和acml,建構一個易于使用、跨平台的高性能線性代數庫,并與hurricane進行深度整合,力求在分布式和科學計算、深度學習找到最好的切合點,并充分吸收整合其他現有的分布式機器學習架構,減少從科研到産品的轉換難度。

原文釋出時間為:2016-11-28

本文來自雲栖社群合作夥伴dbaplus

繼續閱讀