文章目錄
- 第十八章_後端架構選型、離線及實時計算
-
- 18.1 為什麼需要分布式計算?
- 18.2 目前有哪些深度學習分布式計算架構?
-
- 18.2.1 PaddlePaddle
- 18.2.2 Deeplearning4j
- 18.2.3 Mahout
- 18.2.4 Spark MLllib
- 18.2.5 Ray
- 18.2.6 Spark stream
- 18.2.7 Horovod
- 18.2.8 BigDL
- 18.2.9 Petastorm
- 18.2.10 TensorFlowOnSpark
- 18.3 如何選擇合适的分布式計算架構進行模型訓練?
- 18.4 如何進行實時計算?
-
- 18.4.1 什麼是實時流計算?
- 18.4.2 實時流計算過程
- 18.5 如何進行離線計算?
- 18.6 如何使用分布式架構提高模型訓練速度?
- 18.7 深度學習分布式計算架構如何在移動網際網路中應用?
- 18.8 如何在個性化推薦中應用深度學習分布式架構?
- 18.9 如何評價個性化推薦系統的效果?
-
- 18.9.1 準确率與召回率(Precision & Recall)
- 18.9.2 綜合評價名額(F-Measure)
- 18.9.3 E值
- 18.9.4 平均正确率(Average Precision)
- 18.10 參考文獻
第十八章_後端架構選型、離線及實時計算
Markdown Revision 1;
Date: 2018/11/11
Editor: 梁志成
Contact: [email protected]
18.1 為什麼需要分布式計算?
在這個資料爆炸的時代,産生的資料量不斷地在攀升,從GB,TB,PB,ZB.挖掘其中資料的價值也是企業在不斷地追求的終極目标。但是要想對海量的資料進行挖掘,首先要考慮的就是海量資料的存儲問題,比如Tb量級的資料。
談到資料的存儲,則不得不說的是磁盤的資料讀寫速度問題。早在上個世紀90年代初期,普通硬碟的可以存儲的容量大概是1G左右,硬碟的讀取速度大概為4.4MB/s.讀取一張硬碟大概需要5分鐘時間,但是如今硬碟的容量都在1TB左右了,相比擴充了近千倍。但是硬碟的讀取速度大概是100MB/s。讀完一個硬碟所需要的時間大概是2.5個小時。是以如果是基于TB級别的資料進行分析的話,光硬碟讀取完資料都要好幾天了,更談不上計算分析了。那麼該如何處理大資料的存儲,計算分析呢?
一個很簡單的減少資料讀寫時間的方法就是同時從多個硬碟上讀寫資料,比如,如果我們有100個硬碟,每個硬碟存儲1%的資料 ,并行讀取,那麼不到兩分鐘就可以完成之前需要2.5小時的資料讀寫任務了。這就是大資料中的分布式存儲的模型。當然實作分布式存儲還需要解決很多問題,比如硬體故障的問題,使用多台主機進行分布式存儲時,若主機故障,會出現資料丢失的問題,是以有了副本機制:系統中儲存資料的副本。一旦有系統發生故障,就可以使用另外的副本進行替換(著名的RAID備援磁盤陣列就是按這個原理實作的)。其次比如一個很大的檔案如何進行拆分存儲,讀取拆分以後的檔案如何進行校驗都是要考慮的問題。比如我們使用Hadoop中的HDFS也面臨這個問題,隻是架構給我們實作了這些問題的解決辦法,開發中開發者不用考慮這些問題,底層架構已經實作了封裝。
同樣假如有一個10TB的檔案,我們要統計其中某個關鍵字的出現次數,傳統的做法是周遊整個檔案,然後統計出關鍵字的出現次數,這樣效率會特别特别低。基于分布式存儲以後,資料被分布式存儲在不同的伺服器上,那麼我們就可以使用分布式計算架構(比如MapReduce,Spark等)來進行并行計算(或者說是分布式計算),即:每個伺服器上分别統計自己存儲的資料中關鍵字出現的次數,最後進行一次彙總,那麼假如資料分布在100台伺服器上,即同時100台伺服器同時進行關鍵字統計工作,效率一下子可以提高幾十倍。
18.2 目前有哪些深度學習分布式計算架構?
18.2.1 PaddlePaddle
PaddlePaddle【1】是百度開源的一個深度學習平台。PaddlePaddle為深度學習研究人員提供了豐富的API,可以輕松地完成神經網絡配置,模型訓練等任務。
官方文檔中簡易介紹了如何使用架構在
- 線性回歸
- 識别數字
- 圖像分類
- 詞向量
- 個性化推薦
- 情感分析
- 語義角色标注
- 機器翻譯
等方面的應用
Github位址:https://github.com/PaddlePaddle/Paddle
18.2.2 Deeplearning4j
DeepLearning4J(DL4J)【2】是一套基于Java語言的神經網絡工具包,可以建構、定型和部署神經網絡。DL4J與Hadoop和Spark內建,支援分布式CPU和GPU。
Deeplearning4j包括了分布式、多線程的深度學習架構,以及普通的單線程深度學習架構。定型過程以叢集進行,也就是說,Deeplearning4j可以快速處理大量資料。Deeplearning4j在開放堆棧中作為子產品元件的功能,使之成為為微服務架構打造的深度學習架構。
Deeplearning4j從各類淺層網絡出發,設計深層神經網絡。這一靈活性使使用者可以根據所需,在分布式、生産級、能夠在分布式CPU或GPU的基礎上與Spark和Hadoop協同工作的架構内,整合受限玻爾茲曼機、其他自動編碼器、卷積網絡或遞歸網絡。
Deeplearning4j在已建立的各個庫及其在系統整體中的所處位置
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-QFPSbsJa-1575791675394)(./img/18-2-2.png)]
Github位址:https://github.com/deeplearning4j/deeplearning4j
18.2.3 Mahout
Mahout【3】是基于Hadoop的機器學習和資料挖掘的一個分布式架構。Mahout用MapReduce實作了部分資料挖掘算法,解決了并行挖掘的問題。
Mahout包含許多實作,包括聚類、分類、推薦過濾、頻繁子項挖掘等。
Mahout算法庫:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-o4i3wYdN-1575791675395)(./img/18-2-3-1.png)]
Mahout應用場景:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-hYo7jlSP-1575791675397)(./img/18-2-3-2.png)]
Github位址:https://github.com/apache/mahout
18.2.4 Spark MLllib
MLlib(Machine Learnig lib) 【4】是Spark對常用的機器學習算法的實作庫,同時包括相關的測試和資料生成器。
MLlib是MLBase一部分,其中MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。
- ML Optimizer會選擇它認為最适合的已經在内部實作好了的機器學習算法和相關參數,來處理使用者輸入的資料,并傳回模型或别的幫助分析的結果;
- MLI 是一個進行特征抽取和進階ML程式設計抽象的算法實作的API或平台;
- MLlib是Spark實作一些常見的機器學習算法和實用程式,包括分類、回歸、聚類、協同過濾、降維以及底層優化,該算法可以進行可擴充; MLRuntime 基于Spark計算架構,将Spark的分布式計算應用到機器學習領域。
MLlib主要包含三個部分:
- 底層基礎:包括Spark的運作庫、矩陣庫和向量庫
- 算法庫:包含廣義線性模型、推薦系統、聚類、決策樹和評估的算法
- 實用程式:包括測試資料的生成、外部資料的讀入等功能
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ZjfPTZE7-1575791675399)(./img/18-2-4-1.png)]
架構圖
MLlib目前支援4種常見的機器學習問題: 分類、回歸、聚類和協同過濾,MLlib在Spark整個生态系統中的位置如圖下圖所示。
18.2.5 Ray
Ray【5】是加州大學伯克利分校實時智能安全執行實驗室(RISELab)的研究人員針對機器學習領域開發的一種新的分布式計算架構,該架構旨在讓基于Python的機器學習和深度學習工作負載能夠實時執行,并具有類似消息傳遞接口(MPI)的性能和細粒度。
增強學習的場景,按照原理定義,因為沒有預先可用的靜态标簽資訊,是以通常需要引入實際的目标系統(為了加快訓練,往往是目标系統的模拟環境)來擷取回報資訊,用做損失/收益判斷,進而完成整個訓練過程的閉環回報。典型的步驟是通過觀察特定目标系統的狀态,收集回報資訊,判斷收益,用這些資訊來調整參數,訓練模型,并根據新的訓練結果産出可用于調整目标系統的行為Action,輸出到目标系統,進而影響目标系統狀态變化,完成閉環,如此反複疊代,最終目标是追求某種收益的最大化(比如對AlphoGo來說,收益是赢得一盤圍棋的比賽)。
在這個過程中,一方面,模拟目标系統,收集狀态和回報資訊,判斷收益,訓練參數,生成Action等等行為可能涉及大量的任務和計算(為了選擇最佳Action,可能要并發模拟衆多可能的行為)。而這些行為本身可能也是千差萬别的異構的任務,任務執行的時間也可能長短不一,執行過程有些可能要求同步,也有些可能更适合異步。
另一方面,整個任務流程的DAG圖也可能是動态變化的,系統往往可能需要根據前一個環節的結果,調整下一個環節的行為參數或者流程。這種調整,可能是目标系統的需要(比如在自動駕駛過程中遇到行人了,那麼我們可能需要模拟計算刹車的距離來判斷該采取的行動是刹車還是拐彎,而平時可能不需要這個環節),也可能是增強學習特定訓練算法的需要(比如根據多個并行訓練的模型的目前收益,調整模型超參數,替換模型等等)。
此外,由于所涉及到的目标系統可能是具體的,現實實體世界中的系統,是以對時效性也可能是有強要求的。舉個例子,比如你想要實作的系統是用來控制機器人行走,或者是用來打視訊遊戲的。那麼整個閉環回報流程就需要在特定的時間限制内完成(比如毫秒級别)。
總結來說,就是增強學習的場景,對分布式計算架構的任務排程延遲,吞吐量和動态修改DAG圖的能力都可能有很高的要求。按照官方的設計目标,Ray需要支援異構計算任務,動态計算鍊路,毫秒級别延遲和每秒排程百萬級别任務的能力。
Ray的目标問題,主要是在類似增強學習這樣的場景中所遇到的工程問題。那麼增強學習的場景和普通的機器學習,深度學習的場景又有什麼不同呢?簡單來說,就是對整個處理鍊路流程的時效性和靈活性有更高的要求。
Ray架構優點
- 海量任務排程能力
- 毫秒級别的延遲
- 異構任務的支援
- 任務拓撲圖動态修改的能力
Ray沒有采用中心任務排程的方案,而是采用了類似層級(hierarchy)排程的方案,除了一個全局的中心排程服務節點(實際上這個中心排程節點也是可以水準拓展的),任務的排程也可以在具體的執行任務的工作節點上,由本地排程服務來管理和執行。
與傳統的層級排程方案,至上而下配置設定排程任務的方式不同的是,Ray采用了至下而上的排程政策。也就是說,任務排程的發起,并不是先送出給全局的中心排程器統籌規劃以後再分發給次級排程器的。而是由任務執行節點直接送出給本地的排程器,本地的排程器如果能滿足該任務的排程需求就直接完成排程請求,在無法滿足的情況下,才會送出給全局排程器,由全局排程器協調轉發給有能力滿足需求的另外一個節點上的本地排程器去排程執行。
架構設計一方面減少了跨節點的RPC開銷,另一方面也能規避中心節點的瓶頸問題。當然缺點也不是沒有,由于缺乏全局的任務視圖,無法進行全局規劃,是以任務的拓撲邏輯結構也就未必是最優的了。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-MtpuCcGo-1575791675400)(./img/18-2-5-1.png)]
架構圖
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-kraL5RCL-1575791675401)(./img/18-2-5-2.png)]
任務排程圖
Ray架構現狀:
- API層以上 的部分還比較薄弱,Core子產品核心邏輯估需要時間打磨。
- 國内目前除了螞蟻金服和RISELab有針對性的合作以外,關注程度還很低,沒有實際的應用執行個體看到,整體來說還處于比較早期的架構建構階段。
Github位址:https://github.com/ray-project/ray
18.2.6 Spark stream
随着大資料的發展,人們對大資料的處理要求也越來越高,原有的批處理架構MapReduce适合離線計算,卻無法滿足實時性要求較高的業務,如實時推薦、使用者行為分析等。 Spark Streaming是建立在Spark上的實時計算架構,通過它提供的豐富的API、基于記憶體的高速執行引擎,使用者可以結合流式、批處理和互動試查詢應用。
Spark是一個類似于MapReduce的分布式計算架構,其核心是彈性分布式資料集,提供了比MapReduce更豐富的模型,可以在快速在記憶體中對資料集進行多次疊代,以支援複雜的資料挖掘算法和圖形計算算法。Spark Streaming【6】是一種建構在Spark上的實時計算架構,它擴充了Spark處理大規模流式資料的能力。
Spark Streaming的優勢在于:
- 能運作在100+的結點上,并達到秒級延遲。
- 使用基于記憶體的Spark作為執行引擎,具有高效和容錯的特性。
- 能內建Spark的批處理和互動查詢。
- 為實作複雜的算法提供和批處理類似的簡單接口。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-Wur7O7Z6-1575791675402)(./img/18-2-6-1.png)]
Spark Streaming架構圖
Spark Streaming把實時輸入資料流以時間片Δt (如1秒)為機關切分成塊。Spark Streaming會把每塊資料作為一個RDD,并使用RDD操作處理每一小塊資料。每個塊都會生成一個Spark Job處理,最終結果也傳回多塊。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-R0pqAPiC-1575791675405)(./img/18-2-6-2.png)]
Spark Streaming基本原理圖
正如Spark Streaming最初的目标一樣,它通過豐富的API和基于記憶體的高速計算引擎讓使用者可以結合流式處理,批處理和互動查詢等應用。是以Spark Streaming适合一些需要曆史資料和實時資料結合分析的應用場合。當然,對于實時性要求不是特别高的應用也能完全勝任。另外通過RDD的資料重用機制可以得到更高效的容錯處理。
18.2.7 Horovod
Horovod【7】 是 Uber 開源的又一個深度學習工具,它的發展吸取了 Facebook「一小時訓練 ImageNet 論文」與百度 Ring Allreduce 的優點,可為使用者實作分布式訓練提供幫助。
Horovod 支援通過用于高性能并行計算的低層次接口 – 消息傳遞接口 (MPI) 進行分布式模型訓練。有了 MPI,就可以利用分布式 Kubernetes 叢集來訓練 TensorFlow 和 PyTorch 模型。
分布式 TensorFlow 的參數伺服器模型(parameter server paradigm)通常需要對大量樣闆代碼進行認真的實作。但是 Horovod 僅需要幾行。下面是一個分布式 TensorFlow 項目使用 Horovod 的示例:
import tensorflow as tf
import horovod.tensorflow as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())
# Build model…
loss = …
opt = tf.train.AdagradOptimizer(0.01)
# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)
# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]
# Make training operation
train_op = opt.minimize(loss)
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=“/tmp/train_logs”,
config=config,
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# Perform synchronous training.
mon_sess.run(train_op)
在該示例中,粗體文字指進行單個 GPU 分布式項目時必須做的改變:
- hvd.init() 初始化 Horovod。
- config.gpu_options.visible_device_list = str(hvd.local_rank()) 向每個 TensorFlow 流程配置設定一個 GPU。
- opt=hvd.DistributedOptimizer(opt) 使用 Horovod 優化器包裹每一個正常 TensorFlow 優化器,Horovod 優化器使用 ring-allreduce 平均梯度。
- hvd.BroadcastGlobalVariablesHook(0) 将變量從第一個流程向其他流程傳播,以實作一緻性初始化。如果該項目無法使用 MonitoredTrainingSession,則使用者可以運作 hvd.broadcast_global_variables(0)。
之後,可以使用 mpirun 指令使該項目的多個拷貝在多個伺服器中運作:
$ mpirun -np 16 -x LD_LIBRARY_PATH -H
server1:4,server2:4,server3:4,server4:4 python train.py
mpirun 指令向四個節點分布 train.py,然後在每個節點的四個 GPU 上運作 train.py。
Github位址:https://github.com/uber/horovod
18.2.8 BigDL
BigDL【9】是一種基于Apache Spark的分布式深度學習架構。它可以無縫的直接運作在現有的Apache Spark和Hadoop叢集之上。BigDL的設計吸取了Torch架構許多方面的知識,為深度學習提供了全面的支援;包括數值計算和進階神經網絡;借助現有的Spark叢集來運作深度學習計算,并簡化存儲在Hadoop中的大資料集的資料加載。
BigDL優點:
- 豐富的深度學習支援。模拟Torch之後,BigDL為深入學習提供全面支援,包括數字計算(通過Tensor)和進階神經網絡 ; 此外,使用者可以使用BigDL将預先訓練好的Caffe或Torch模型加載到Spark程式中。
- 極高的性能。為了實作高性能,BigDL在每個Spark任務中使用英特爾MKL和多線程程式設計。是以,在單節點Xeon(即與主流GPU 相當)上,它比開箱即用開源Caffe,Torch或TensorFlow快了數量級。
- 有效地橫向擴充。BigDL可以通過利用Apache Spark(快速分布式資料處理架構),以及高效實施同步SGD和全面減少Spark的通信,進而有效地擴充到“大資料規模”上的資料分析
BigDL缺點:
- 對機器要求高 jdk7上運作性能差 在CentOS 6和7上,要将最大使用者程序增加到更大的值(例如514585); 否則,可能會看到錯誤,如“無法建立新的本機線程”。
- 訓練和驗證的資料會加載到記憶體,擠占記憶體
BigDL滿足的應用場景:
- 直接在Hadoop/Spark架構下使用深度學習進行大資料分析(即将資料存儲在HDFS、HBase、Hive等資料庫上);
- 在Spark程式中/工作流中加入深度學習功能;
- 利用現有的 Hadoop/Spark 叢集來運作深度學習程式,然後将代碼與其他的應用場景進行動态共享,例如ETL(Extract、Transform、Load,即通常所說的資料抽取)、資料倉庫(data warehouse)、功能引擎、經典機器學習、圖表分析等。
18.2.9 Petastorm
Petastorm是一個由 Uber ATG 開發的開源資料通路庫。這個庫可以直接基于數 TB Parquet 格式的資料集進行單機或分布式訓練和深度學習模型評估。Petastorm 支援基于 Python 的機器學習架構,如 Tensorflow、Pytorch 和 PySpark,也可以直接用在 Python 代碼中。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-3Je3GkFN-1575791675406)(./img/18-2-9-1.png)]
深度學習叢集
即使是在現代硬體上訓練深度模型也很耗時,而且在很多情況下,很有必要在多台機器上配置設定訓練負載。典型的深度學習叢集需要執行以下幾個步驟:
- 一台或多台機器讀取集中式或本地資料集。
- 每台機器計算損失函數的值,并根據模型參數計算梯度。在這一步通常會使用 GPU。
- 通過組合估計的梯度(通常由多台機器以分布式的方式計算得出)來更新模型系數。
通常,一個資料集是通過連接配接多個資料源的記錄而生成的。這個由 Apache Spark 的 Python 接口 PySpark 生成的資料集稍後将被用在機器學習訓練中。Petastorm 提供了一個簡單的功能,使用 Petastorm 特定的中繼資料對标準的 Parquet 進行了擴充,進而讓它可以與 Petastorm 相容。
有了 Petastorm,消費資料就像在 HDFS 或檔案系統中建立和疊代讀取對象一樣簡單。Petastorm 使用 PyArrow 來讀取 Parquet 檔案。
将多個資料源組合到單個表格結構中,進而生成資料集。可以多次使用相同的資料集進行模型訓練和評估。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-3gMwvCHH-1575791675408)(./img/18-2-9-2.png)]
深度學習叢集
為分布式訓練進行分片
在分布式訓練環境中,每個程序通常負責訓練資料的一個子集。一個程序的資料子集與其他程序的資料子集正交。Petastorm 支援将資料集的讀時分片轉換為正交的樣本集。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-LwE2vXXe-1575791675408)(./img/18-2-9-3.png)]
Petastorm 将資料集的非重疊子集提供給參與分布式訓練的不同機器
本地緩存
Petastorm 支援在本地存儲中緩存資料。當網絡連接配接速度較慢或帶寬很昂貴時,這會派上用場。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-CF1lGF97-1575791675410)(./img/18-2-9-4.png)]
Github位址:https://github.com/uber/petastorm
18.2.10 TensorFlowOnSpark
TensorFlowOnSpark【10】為 Apache Hadoop 和 Apache Spark 叢集帶來可擴充的深度學習。 通過結合深入學習架構 TensorFlow 和大資料架構 Apache Spark 、Apache Hadoop 的顯着特征,TensorFlowOnSpark 能夠在 GPU 和 CPU 伺服器叢集上實作分布式深度學習。
滿足的應用場景:
為了利用TensorFlow在現有的Spark和Hadoop叢集上進行深度學習。而不需要為深度學習設定單獨的叢集。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-TXiudywP-1575791675411)(./img/18-2-10-1.png)]
架構圖
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-aYJTPCL7-1575791675412)(./img/18-2-10-2.png)]
運作流程圖
優點:
- 輕松遷移所有現有的TensorFlow程式,<10行代碼更改;
- 支援所有TensorFlow功能:同步/異步訓練,模型/資料并行,推理和TensorBoard;
- 伺服器到伺服器的直接通信在可用時實作更快的學習;
- 允許資料集在HDFS和由Spark推動的其他來源或由TensorFlow拖動;
- 輕松內建您現有的資料處理流水線和機器學習算法(例如,MLlib,CaffeOnSpark);
- 輕松部署在雲或内部部署:CPU和GPU,以太網和Infiniband。
- TensorFlowOnSpark是基于google的TensorFlow的實作,而TensorFlow有着一套完善的教程,内容豐富。
劣勢:
- 開源時間不長,未得到充分的驗證。
Github 位址:https://github.com/yahoo/TensorFlowOnSpark
18.3 如何選擇合适的分布式計算架構進行模型訓練?
18.4 如何進行實時計算?
18.4.1 什麼是實時流計算?
所謂實時流計算,就是近幾年由于資料得到廣泛應用之後,在資料持久性模組化不滿足現狀的情況下,急需資料流的瞬時模組化或者計算處理。這種實時計算的應用執行個體有金融服務、網絡監控、電信資料管理、 Web 應用、生産制造、傳感檢測,等等。在這種資料流模型中,單獨的資料單元可能是相關的元組(Tuple),如網絡測量、呼叫記錄、網頁通路等産生的資料。但是,這些資料以大量、快速、時變(可能是不可預知)的資料流持續到達,由此産生了一些基礎性的新的研究問題——實時計算。實時計算的一個重要方向就是實時流計算。
18.4.2 實時流計算過程
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-BksFSNXN-1575791675413)(./img/18-4-1.png)]
我們以熱賣産品的統計為例,看下傳統的計算手段:
- 将使用者行為、log等資訊清洗後儲存在資料庫中.
- 将訂單資訊儲存在資料庫中.
- 利用觸發器或者協程等方式建立本地索引,或者遠端的獨立索引.
- join訂單資訊、訂單明細、使用者資訊、商品資訊等等表,聚合統計20分鐘内熱賣産品,并傳回top-10.
- web或app展示.
這是一個假想的場景,但假設你具有處理類似場景的經驗,應該會體會到這樣一些問題和難處:
-
水準擴充問題(scale-out)
顯然,如果是一個具有一定規模的電子商務網站,資料量都是很大的。而交易資訊因為涉及事務,是以很難直接舍棄關系型資料庫的事務能力,遷移到具有更好的scale-out能力的NoSQL資料庫中。
那麼,一般都會做sharding。曆史資料還好說,我們可以按日期來歸檔,并可以通過批處理式的離線計算,将結果緩存起來。
但是,這裡的要求是20分鐘内,這很難。
-
性能問題
這個問題,和scale-out是一緻的,假設我們做了sharding,因為表分散在各個節點中,是以我們需要多次入庫,并在業務層做聚合計算。
問題是,20分鐘的時間要求,我們需要入庫多少次呢?
10分鐘呢?
5分鐘呢?
實時呢?
而且,業務層也同樣面臨着單點計算能力的局限,需要水準擴充,那麼還需要考慮一緻性的問題。
是以,到這裡一切都顯得很複雜。
- 業務擴充問題
假設我們不僅僅要處理熱賣商品的統計,還要統計廣告點選、或者迅速根據使用者的通路行為判斷使用者特征以調整其所見的資訊,更加符合使用者的潛在需求等,那麼業務層将會更加複雜。
也許你有更好的辦法,但實際上,我們需要的是一種新的認知:
這個世界發生的事,是實時的。
是以我們需要一種實時計算的模型,而不是批處理模型。
我們需要的這種模型,必須能夠處理很大的資料,是以要有很好的scale-out能力,最好是,我們都不需要考慮太多一緻性、複制的問題。
那麼,這種計算模型就是實時計算模型,也可以認為是流式計算模型。
現在假設我們有了這樣的模型,我們就可以愉快地設計新的業務場景:
- 轉發最多的微網誌是什麼?
- 最熱賣的商品有哪些?
- 大家都在搜尋的熱點是什麼?
-
我們哪個廣告,在哪個位置,被點選最多?
或者說,我們可以問:
這個世界,在發生什麼?
最熱的微網誌話題是什麼?
我們以一個簡單的滑動視窗計數的問題,來揭開所謂實時計算的神秘面紗。
假設,我們的業務要求是:
統計20分鐘内最熱的10個微網誌話題。
解決這個問題,我們需要考慮:
- 資料源
這裡,假設我們的資料,來自微網誌長連接配接推送的話題。
- 問題模組化
我們認為的話題是#号擴起來的話題,最熱的話題是此話題出現的次數比其它話題都要多。
比如:@foreach_break : 你好,#世界#,我愛你,#微網誌#。
“世界”和“微網誌”就是話題。
- 計算引擎采用storm
- 定義時間
時間的定義是一件很難的事情,取決于所需的精度是多少。
根據實際,我們一般采用tick來表示時刻這一概念。
在storm的基礎設施中,executor啟動階段,采用了定時器來觸發“過了一段時間”這個事件。
如下所示:
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
receive-queue (:receive-queue executor-data)
context (:worker-context executor-data)]
(when tick-time-secs
(if (or (system-id? (:component-id executor-data))
(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))))
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
(schedule-recurring
(:user-timer worker)
tick-time-secs
tick-time-secs
(fn []
(disruptor/publish
receive-queue
[[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
)))))))
之前的博文中,已經詳細分析了這些基礎設施的關系,不了解的童鞋可以翻看前面的文章。
每隔一段時間,就會觸發這樣一個事件,當流的下遊的bolt收到一個這樣的事件時,就可以選擇是增量計數還是将結果聚合并發送到流中。
bolt如何判斷收到的tuple表示的是“tick”呢?
負責管理bolt的executor線程,從其訂閱的消息隊列消費消息時,會調用到bolt的execute方法,那麼,可以在execute中這樣判斷:
public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
結合上面的setup-tick!的clojure代碼,我們可以知道SYSTEM_TICK_STREAM_ID在定時事件的回調中就以構造函數的參數傳遞給了tuple,那麼SYSTEM_COMPONENT_ID是如何來的呢?
可以看到,下面的代碼中,SYSTEM_TASK_ID同樣傳給了tuple:
;; 請注意SYSTEM_TASK_ID和SYSTEM_TICK_STREAM_ID
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)
然後利用下面的代碼,就可以得到SYSTEM_COMPONENT_ID:
public String getComponentId(int taskId) {
if(taskId==Constants.SYSTEM_TASK_ID) {
return Constants.SYSTEM_COMPONENT_ID;
} else {
return _taskToComponent.get(taskId);
}
}
滑動視窗
有了上面的基礎設施,我們還需要一些手段來完成“工程化”,将設想變為現實。
這裡,我們看看Michael G. Noll的滑動視窗設計。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-ZeYZq0wj-1575791675415)(./img/18-4-2.png)]
Topology
String spoutId = "wordGenerator";
String counterId = "counter";
String intermediateRankerId = "intermediateRanker";
String totalRankerId = "finalRanker";
// 這裡,假設TestWordSpout就是我們發送話題tuple的源
builder.setSpout(spoutId, new TestWordSpout(), 5);
// RollingCountBolt的時間視窗為9秒鐘,每3秒發送一次統計結果到下遊
builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
// IntermediateRankingsBolt,将完成部分聚合,統計出top-n的話題
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
"obj"));
// TotalRankingsBolt, 将完成完整聚合,統計出top-n的話題
builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
上面的topology設計如下:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-W9B5MLAU-1575791675416)(./img/18-4-3.png)]
将聚合計算與時間結合起來
前文,我們叙述了tick事件,回調中會觸發bolt的execute方法,那可以這麼做:
RollingCountBolt:
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
LOG.debug("Received tick tuple, triggering emit of current window counts");
// tick來了,将時間視窗内的統計結果發送,并讓視窗滾動
emitCurrentWindowCounts();
}
else {
// 正常tuple,對話題計數即可
countObjAndAck(tuple);
}
}
// obj即為話題,增加一個計數 count++
// 注意,這裡的速度基本取決于流的速度,可能每秒百萬,也可能每秒幾十.
// 記憶體不足? bolt可以scale-out.
private void countObjAndAck(Tuple tuple) {
Object obj = tuple.getValue(0);
counter.incrementCount(obj);
collector.ack(tuple);
}
// 将統計結果發送到下遊
private void emitCurrentWindowCounts() {
Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
lastModifiedTracker.markAsModified();
if (actualWindowLengthInSeconds != windowLengthInSeconds) {
LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
}
emit(counts, actualWindowLengthInSeconds);
}
上面的代碼可能有點抽象,看下這個圖就明白了,tick一到,視窗就滾動:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-sc8sgVkj-1575791675417)(./img/18-4-4.png)]
IntermediateRankingsBolt & TotalRankingsBolt:
public final void execute(Tuple tuple, BasicOutputCollector collector) {
if (TupleUtils.isTick(tuple)) {
getLogger().debug("Received tick tuple, triggering emit of current rankings");
// 将聚合并排序的結果發送到下遊
emitRankings(collector);
}
else {
// 聚合并排序
updateRankingsWithTuple(tuple);
}
}
其中,IntermediateRankingsBolt和TotalRankingsBolt的聚合排序方法略有不同:
IntermediateRankingsBolt的聚合排序方法:
@Override
void updateRankingsWithTuple(Tuple tuple) {
// 這一步,将話題、話題出現的次數提取出來
Rankable rankable = RankableObjectWithFields.from(tuple);
// 這一步,将話題出現的次數進行聚合,然後重排序所有話題
super.getRankings().updateWith(rankable);
}
TotalRankingsBolt的聚合排序方法:
@Override
void updateRankingsWithTuple(Tuple tuple) {
// 提出來自IntermediateRankingsBolt的中間結果
Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
// 聚合并排序
super.getRankings().updateWith(rankingsToBeMerged);
// 去0,節約記憶體
super.getRankings().pruneZeroCounts();
}
而重排序方法比較簡單粗暴,因為隻求前N個,N不會很大:
private void rerank() {
Collections.sort(rankedItems);
Collections.reverse(rankedItems);
}
結語
下圖可能就是我們想要的結果,我們完成了t0 - t1時刻之間的熱點話題統計.
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-xqwOYtKC-1575791675417)(./img/18-4-5.png)]
18.5 如何進行離線計算?
18.6 如何使用分布式架構提高模型訓練速度?
18.7 深度學習分布式計算架構如何在移動網際網路中應用?
18.8 如何在個性化推薦中應用深度學習分布式架構?
18.9 如何評價個性化推薦系統的效果?
18.9.1 準确率與召回率(Precision & Recall)
準确率和召回率是廣泛用于資訊檢索和統計學分類領域的兩個路徑成本,用來評價結果的品質。其中精度是檢索出相關文檔數與檢索出的文檔總數的比率,衡量的是檢索系統的查準率;召回率是指檢索出的相關文檔數和文檔庫中所有的相關文檔數的比率,衡量的是檢索系統的查全率。
一般來說,Precision就是檢索出來的條目(比如:文檔、網頁等)有多少是準确的,Recall就是所有準确的條目有多少被檢索出來了。
正确率、召回率和 F 值是在魚龍混雜的環境中,選出目标的重要評價名額。不妨看看這些名額的定義先:
正确率 = 提取出的正确資訊條數 / 提取出的資訊條數
召回率 = 提取出的正确資訊條數 / 樣本中的資訊條數
兩者取值在0和1之間,數值越接近1,查準率或查全率就越高。
F值 = 正确率 * 召回率 * 2 / (正确率 + 召回率) (F 值即為正确率和召回率的調和平均值)
不妨舉這樣一個例子:某池塘有1400條鯉魚,300隻蝦,300隻鼈。現在以捕鯉魚為目的。撒一大網,逮着了700條鯉魚,200隻蝦,100隻鼈。那麼,這些名額分别如下:
正确率 = 700 / (700 + 200 + 100) = 70%
召回率 = 700 / 1400 = 50%
F值 = 70% * 50% * 2 / (70% + 50%) = 58.3%
不妨看看如果把池子裡的所有的鯉魚、蝦和鼈都一網打盡,這些名額又有何變化:
正确率 = 1400 / (1400 + 300 + 300) = 70%
召回率 = 1400 / 1400 = 100%
F值 = 70% * 100% * 2 / (70% + 100%) = 82.35%
由此可見,正确率是評估捕獲的成果中目标成果所占得比例;召回率,顧名思義,就是從關注領域中,召回目标類别的比例;而F值,則是綜合這二者名額的評估名額,用于綜合反映整體的名額。
當然希望檢索結果Precision越高越好,同時Recall也越高越好,但事實上這兩者在某些情況下有沖突的。比如極端情況下,我們隻搜尋出了一個結果,且是準确的,那麼Precision就是100%,但是Recall就很低;而如果我們把所有結果都傳回,那麼比如Recall是100%,但是Precision就會很低。是以在不同的場合中需要自己判斷希望Precision比較高或是Recall比較高。如果是做實驗研究,可以繪制Precision-Recall曲線來幫助分析。
18.9.2 綜合評價名額(F-Measure)
P和R名額有時候會出現的沖突的情況,這樣就需要綜合考慮他們,最常見的方法就是F-Measure(又稱為F-Score)。
F-Measure是Precision和Recall權重調和平均:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-gFdiunt5-1575791675418)(./img/18-9-2-1.png)]
當參數α=1時,就是最常見的F1,也即
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-rRTnB1aP-1575791675419)(./img/18-9-2-2.png)]
可知F1綜合了P和R的結果,當F1較高時則能說明試驗方法比較有效。
18.9.3 E值
E值表示查準率P和查全率R的權重平均值,當其中一個為0時,E值為1,其計算公式:
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-Q8tbTOQU-1575791675420)(./img/18-9-3-1.png)]
b越大,表示查準率的權重越大。
18.9.4 平均正确率(Average Precision)
平均正确率表示不同查全率的點上的正确率的平均。
18.10 參考文獻
【1】http://www.paddlepaddle.org/documentation/book/zh/0.11.0/05.recommender_system/index.cn.html
【2】https://deeplearning4j.org/cn/compare-dl4j-torch7-pylearn.html
【3】http://mahout.apache.org/
【4】http://spark.apache.org/docs/1.1.0/mllib-guide.html
【5】https://ray.readthedocs.io/en/latest/tutorial.html
【6】http://spark.apache.org/streaming/
【7】https://github.com/uber/horovod
【8】https://software.intel.com/en-us/articles/bigdl-distributed-deep-learning-on-apache-spark
【9】https://eng.uber.com/petastorm/
【10】https://yahoo.github.io/TensorFlowOnSpark/#
…
未完待續!