序言:大資料處理的核心問題,一是海量資料如何存儲?二是海量資料如何計算?傳統的資料庫處理,無論是存儲資料的能力,還是處理資料的能力,在面對大量資料時,瓶頸漸顯。大資料的誕生很好地處理了以上兩個問題,傳統資料處理體系和大資料體系的差別如下圖所示:

本文将介紹大資料系統最基本元件之處理架構,按之前看過的文章,将其分為三大類:
僅批處理架構,Apache Hadoop,指處理大量資料的任務,無論直接從持久儲存設備處理資料集或首先将資料集載入記憶體,批處理系統在設計過程中充分考慮了資料的量,可提供充足的處理資源。由于批處理在應對大量持久資料方面的表現極為出色,是以經常被用于對曆史資料進行分析。
僅流處理架構,Apache Storm,流處理系統可以處理幾乎無限量的資料,但同一時間隻能處理一條(真正的流處理)或很少量(微批處理,Micro-batch Processing)資料。
混合架構,Apache Spark,Apache Flink,一些處理架構可同時處理批處理和流處理工作負載。這些架構可以用相同或相關的元件和API處理兩種類型的資料,借此讓不同的處理需求得以簡化。
1 Hadoop介紹
1.1 Hadoop簡介
Hadoop 是一個由 Apache 基金會所開發的分布式系統基礎架構,它可以使使用者在不了解分布式底層細節的情況下開發分布式程式,充分利用叢集的威力進行高速運算和存儲。從其定義就可發現,它解決了兩大問題,大資料存儲和大資料分析,也就是 Hadoop 的兩大核心,HDFS 和 MapReduce。(from https://www.cnblogs.com/binarylei/p/8903601.html)
HDFS(Hadoop Distributed File System)是可擴充、容錯、高性能的分布式檔案系統,異步複制,一次寫入多次讀取,主要負責存儲。
MapReduce 為分布式計算架構,包含map(映射)和 reduce(歸約)過程,負責在 HDFS 上進行計算。
我們先來了解下 Hadoop 的發展曆史
2002~2004 年,第一輪網際網路泡沫剛剛破滅,很多網際網路從業人員都失業了。我們們的“主角" Doug Cutting 也不例外,他隻能寫點技術文章賺點稿費來養家糊口。但是 Doug Cutting 不甘寂寞,懷着對夢想和未來的渴望,與他的好朋友 Mike Cafarella 一起開發出一個開源的搜尋引擎 Nutch,并曆時一年把這個系統做到能支援億級網頁的搜尋。但是當時的網頁數量遠遠不止這個規模,是以兩人不斷改進,想讓支援的網頁量再多一個數量級。
在 2003 年和 2004 年, Googles 分別公布了 GFS 和 Mapreduce 兩篇論文。 Doug Cutting 和 Mike Cafarella 發現這與他們的想法不盡相同,且更加完美,完全脫離了人工運維的狀态,實作了自動化。
在經過一系列周密考慮和詳細總結後,2006 年, Dog Cutting 放奔創業,随後幾經周折加入了 yahoo 公司(Nutch 的部分也被正式引入),機綠巧合下,他以自己兒子的一個玩具大象的名字 Hadoop 命名了該項。
當系統進入 Yahoo 以後,項目逐漸發展并成熟了起來。首先是叢集規模,從最開始幾十台機器的規模發展到能支援上千個節點的機器,中間做了很多工程性質的工作;然後是除搜尋以外的業務開發, Yahoo 逐漸将自己廣告系統的資料挖掘相關工作也遷移到了 Hadoop 上,使 Hadoop 系統進一步成熟化了。
2007 年,紐約時報在 100 個亞馬遜的虛拟機伺服器上使用 Hadoop 轉換了 4TB 的圖檔資料更加加深了人們對 Hadoope 的印象。
在 2008 年的時侯,一位 Google 的工程師發現要把當時的 Hadoop 放到任意一個叢集中去運是一件很困難的事情,是以就與幾個好朋友成立了ー個專門商業化 Hadoop 的公司 Cloudera。同年, Facebook 團隊發現他們很多人不會寫 Hadoop 的程式,而對 SQL 的一套東西很熟,是以他們就在 Hadoop 上建構了一個叫作 Hive 的軟體,專把 SQL 轉換為 Hadoop 的 Mapreduce 程式。
2011年, Yahoo 将 Hadoop 團隊獨立出來,成立了ー個子公司 Hortonworks,專門提供 Hadoop 相關的服務。
說了這麼多,那 Hadoop 有哪些優點呢?
Hadoop 是一個能夠讓使用者輕松架構和使用的分布式計算的平台。使用者可以輕松地在 Hadoop 發和運作處理海量資料的應用程式。其優點主要有以下幾個:
(1) 高可靠性 : Hadoop 按位存儲和處理資料的能力值得人們信賴。
(2) 高擴充性 : Hadoop 是在可用的計算機集簇間配置設定資料并完成計算任務的,這些集簇可以友善地擴充到數以幹計的節點中。
(3) 高效性 : Hadoop能夠在節點之間動态地移動資料,并保證各個節點的動态平衡,是以處理速度非常快。
(4) 高容錯性 : Hadoop能夠自動儲存資料的多個副本,并且能夠自動将失敗的任務重新分。
(5) 低成本 : 與一體機、商用資料倉庫以及 QlikView、 Yonghong Z- Suites 等資料集市相比,Hadoop 是開源的,項目的軟體成本是以會大大降低。
Hadoop 帶有用 Java 語言編寫的架構,是以運作在 linux 生産平台上是非常理想的, Hadoop 上的應用程式也可以使用其他語言編寫,比如 C++。
1.2 Hadoop架構
1.2.1 Hadoop 存儲 - HDFS
Hadoop 的存儲系統是 HDFS(Hadoop Distributed File System)分布式檔案系統,對外部用戶端而言,HDFS 就像一個傳統的分級檔案系統,可以進行建立、删除、移動或重命名檔案或檔案夾等操作,與 Linux 檔案系統類似。
但是,Hadoop HDFS 的架構是基于一組特定的節點建構的,名稱節點(NameNode),它在 HDFS 内部提供中繼資料服務;第二名稱節點(Secondary NameNode),名稱節點的幫助節點,主要是為了整合中繼資料操作(注意不是名稱節點的備份);資料節點(DataNode),它為 HDFS 提供存儲塊。
存儲在 HDFS 中的檔案被分成塊,然後這些塊被複制到多個資料節點中(DataNode),這與傳統的 RAID 架構大不相同。塊的大小(通常為 128M)和複制的塊數量在建立檔案時由客戶機決定。名稱節點可以控制所有檔案操作。HDFS 内部的所有通信都基于标準的 TCP/IP 協定。
關于各個元件的具體描述如下所示:
(1)名稱節點(NameNode)
它是一個通常在HDFS架構中單獨機器上運作的元件,負責管理檔案系統名稱空間和控制外部客戶機的通路。NameNode決定是否将檔案映射到DataNode上的複制塊上。對于最常見的3個複制塊,第一個複制塊存儲在同一機架的不同節點上,最後一個複制塊存儲在不同機架的某個節點上。
(2)資料節點(DataNode)
資料節點也是一個通常在HDFS架構中的單獨機器上運作的元件。Hadoop叢集包含一個NameNode和大量DataNode。資料節點通常以機架的形式組織,機架通過一個交換機将所有系統連接配接起來。
資料節點響應來自HDFS客戶機的讀寫請求。它們還響應來自NameNode的建立、删除和複制塊的指令。名稱節點依賴來自每個資料節點的定期心跳(heartbeat)消息。每條消息都包含一個塊報告,名稱節點可以根據這個報告驗證塊映射和其他檔案系統中繼資料。如果資料節點不能發送心跳消息,名稱節點将采取修複措施,重新複制在該節點上丢失的塊。
(3)第二名稱節點(Secondary NameNode)
第二名稱節點的作用在于為HDFS中的名稱節點提供一個Checkpoint,它隻是名稱節點的一個助手節點,這也是它在社群内被認為是Checkpoint Node的原因。
隻有在NameNode重新開機時,edits才會合并到fsimage檔案中,進而得到一個檔案系統的最新快照。但是在生産環境叢集中的NameNode是很少重新開機的,這意味着當NameNode運作很長時間後,edits檔案會變得很大。而當NameNode當機時,edits就會丢失很多改動,如何解決這個問題呢?
fsimage 是 NameNode 啟動時對整個檔案系統的快照;edits 是在 NameNode 啟動後對檔案系統的改動序列。
Secondary NameNode 會定時到 NameNode 去擷取名稱節點的 edits,并及時更新到自己 fsimage 上。這樣,如果 NameNode 當機,我們也可以使用 Secondary-NameNode 的資訊來恢複 NameNode。并且,如果 Secondary NameNode 新的 fsimage 檔案達到一定門檻值,它就會将其拷貝回名稱節點上,這樣 NameNode 在下次重新開機時會使用這個新的 fsimage 檔案,進而減少重新開機的時間。
舉個資料上傳的例子來深入了解下HDFS内部是怎麼做的。
檔案在用戶端時會被分塊,這裡可以看到檔案被分為 5 個塊,分别是:A、B、C、D、E。同時為了負載均衡,是以每個節點有 3 個塊。下面來看看具體步驟:
- 用戶端将要上傳的檔案按 128M 的大小分塊。
- 用戶端向名稱節點發送寫資料請求。
- 名稱節點記錄各個 DataNode 資訊,并傳回可用的 DataNode 清單。
- 用戶端直接向 DataNode 發送分割後的檔案塊,發送過程以流式寫入。
- 寫入完成後,DataNode 向 NameNode 發送消息,更新中繼資料。
這裡需要注意:
- 寫 1T 檔案,需要 3T 的存儲,3T 的網絡流量。
- 在執行讀或寫的過程中,NameNode 和 DataNode 通過 HeartBeat 進行儲存通信,确定 DataNode 活着。如果發現 DataNode 死掉了,就将死掉的 DataNode 上的資料,放到其他節點去,讀取時,讀其他節點。
- 宕掉一個節點沒關系,還有其他節點可以備份;甚至,宕掉某一個機架也沒關系;其他機架上也有備份。
1.2.2 Hadoop 計算 — MapReduce
MapReduce 是 Google 提出的一個軟體架構,用于大規模資料集(大于1TB)的并行運算。概念“Map(映射)”和“Reduce(歸納)”以及它們的主要思想,都是從函數式程式設計語言借來的,還有從矢量程式設計語言借來的特性。
目前的軟體實作是指定一個 Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定并發的 Reduce(歸納)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。
下面将以 Hadoop 的“Hello World”例程—單詞計數來分析MapReduce的邏輯。一般的 MapReduce 程式會經過以下幾個過程:輸入(Input)、輸入分片(Splitting)、Map階段、Shuffle階段、Reduce階段、輸出(Final result)。
- 輸入就不用說了,資料一般放在 HDFS 上面就可以了,而且檔案是被分塊的。關于檔案塊和檔案分片的關系,在輸入分片中說明。
- 輸入分片:在進行 Map 階段之前,MapReduce 架構會根據輸入檔案計算輸入分片(split),每個輸入分片會對應一個 Map 任務,輸入分片往往和 HDFS 的塊關系很密切。例如,HDFS 的塊的大小是 128M,如果我們輸入兩個檔案,大小分别是 27M、129M,那麼 27M 的檔案會作為一個輸入分片(不足 128M 會被當作一個分片),而 129MB 則是兩個輸入分片(129-128=1,不足 128M,是以 1M 也會被當作一個輸入分片),是以,一般來說,一個檔案塊會對應一個分片。如圖 1-7 所示,Splitting 對應下面的三個資料應該了解為三個分片。
- Map 階段:這個階段的處理邏輯其實就是程式員編寫好的 Map 函數,因為一個分片對應一個 Map 任務,并且是對應一個檔案塊,是以這裡其實是資料本地化的操作,也就是所謂的移動計算而不是移動資料。如圖 1-7 所示,這裡的操作其實就是把每句話進行分割,然後得到每個單詞,再對每個單詞進行映射,得到單詞和1的鍵值對。
- Shuffle 階段:這是“奇迹”發生的地方,MapReduce 的核心其實就是 Shuffle。那麼 Shuffle 的原理呢?Shuffle 就是将 Map 的輸出進行整合,然後作為 Reduce 的輸入發送給 Reduce。簡單了解就是把所有 Map 的輸出按照鍵進行排序,并且把相對鍵的鍵值對整合到同一個組中。如圖 1-7 所示,Bear、Car、Deer、River 是排序的,并且 Bear 這個鍵有兩個鍵值對。
- Reduce 階段:與 Map 類似,這裡也是使用者編寫程式的地方,可以針對分組後的鍵值對進行處理。如圖 1-7 所示,針對同一個鍵 Bear 的所有值進行了一個加法操作,得到 <Bear,2> 這樣的鍵值對。
- 輸出:Reduce 的輸出直接寫入 HDFS 上,同樣這個輸出檔案也是分塊的。
說了這麼多,其實 MapReduce 的本質用一張圖可以完整地表現出來。
MapReduce 的本質就是把一組鍵值對 <K1,V1> 經過 Map 階段映射成新的鍵值對 <K2,V2>;接着經過 Shuffle/Sort 階段進行排序和“洗牌”,把鍵值對排序,同時把相同的鍵的值整合;最後經過 Reduce 階段,把整合後的鍵值對組進行邏輯處理,輸出到新的鍵值對 <K3,V3>。這樣的一個過程,其實就是 MapReduce 的本質。
Hadoop MapReduce 可以根據其使用的資源管理架構不同,而分為 MR v1 和 YARN/MR v2 版本。
在 MR v1 版本中,資源管理主要是 Jobtracker 和 TaskTracker。Jobtracker 主要負責:作業控制(作業分解和狀态監控),主要是 MR 任務以及資源管理;而 TaskTracker 主要是排程 Job 的每一個子任務 task;并且接收 JobTracker 的指令。
在 YARN/MR v2 版本中,YARN 把 JobTracker 的工作分為兩個部分:
- ResourceManager(資料總管)全局管理所有應用程式計算資源的配置設定。
- ApplicationMaster 負責相應的排程和協調。
NodeManager 是每一台機器架構的代理,是執行應用程式的容器,監控應用程式的資源(CPU、記憶體、硬碟、網絡)使用情況,并且向排程器彙報。
1.2.3 Hadoop 資源管理 — YARN
在上一節中我們看到,當 MapReduce 發展到 2.x 時就不使用 JobTracker 來作為自己的資源管理架構,而選擇使用 YARN。這裡需要說明的是,如果使用 JobTracker 來作為 Hadoop 叢集的資源管理架構的話,那麼除了 MapReduce 任務以外,不能夠運作其他任務。也就是說,如果我們叢集的 MapReduce 任務并沒有那麼飽滿的話,叢集資源等于是白白浪費的。是以提出了另外的一個資源管理架構 YARN(Yet Another Resource Manager)。這裡需要注意,YARN 不是 JobTracker 的簡單更新,而是“大換血”。同時 Hadoop 2.X 也包含了此架構。Apache Hadoop 2.X 項目包含以下子產品。
- Hadoop Common:為 Hadoop 其他子產品提供支援的基礎子產品。
- HDFS:Hadoop:分布式檔案系統。
- YARN:任務配置設定和叢集資源管理架構。
- MapReduce:并行和可擴充的用于處理大資料的模式。
YARN 資源管理架構包括 ResourceManager(資料總管)、Applica-tionMaster、NodeManager(節點管理器)。各個元件描述如下。
(1)ResourceManager
ResourceManager 是一個全局的資料總管,負責整個系統的資源管理和配置設定。它主要由兩個元件構成:排程器(Scheduler)和應用程式管理器(ApplicationManager,AM)。
Scheduler 負責配置設定最少但滿足 Application 運作所需的資源量給 Application。Scheduler 隻是基于資源的使用情況進行排程,并不負責監視/跟蹤 Application 的狀态,當然也不會處理失敗的 Task。
ApplicationManager 負責處理用戶端送出的 Job 以及協商第一個 Container 以供 App-licationMaster 運作,并且在 ApplicationMaster 失敗的時候會重新啟動 ApplicationMaster(YARN 中使用 Resource Container 概念來管理叢集的資源,Resource Container 是資源的抽象,每個 Container 包括一定的記憶體、IO、網絡等資源)。
(2)ApplicationMaster
ApplicatonMaster 是一個架構特殊的庫,每個 Application 有一個 ApplicationMaster,主要管理和監控部署在 YARN 叢集上的各種應用。
(3)NodeManager
主要負責啟動 ResourceManager 配置設定給 ApplicationMaster 的 Container,并且會監視 Container 的運作情況。在啟動 Container 的時候,NodeManager 會設定一些必要的環境變量以及相關檔案;當所有準備工作做好後,才會啟動該 Container。啟動後,NodeManager 會周期性地監視該 Container 運作占用的資源情況,若是超過了該 Container 所聲明的資源量,則會 kill 掉該 Container 所代表的程序。
如圖 1-11 所示,該叢集上有兩個任務(對應 Node2、Node6 上面的 AM),并且 Node2 上面的任務運作有 4 個 Container 來執行任務;而 Node6 上面的任務則有 2 個 Container 來執行任務。
1.2.4 Hadoop 生态系統
Hadoop 的生态圈其實就是一群動物在狂歡。我們來看看一些主要的架構。
(1)HBase
HBase(Hadoop Database)是一個高可靠性、高性能、面向列、可伸縮的分布式存儲系統,利用 HBase 技術可在廉價 PC Server 上搭建起大規模結構化存儲叢集。
(2)Hive
Hive 是建立在 Hadoop 上的資料倉庫基礎構架。它提供了一系列的工具,可以用來進行資料提取轉化加載(ETL),這是一種可以存儲、查詢和分析存儲在 Hadoop 中的大規模資料的機制。
(3)Pig
Pig 是一個基于 Hadoop 的大規模資料分析平台,它提供的 SQL-LIKE 語言叫作 Pig Latin。該語言的編譯器會把類 SQL 的資料分析請求轉換為一系列經過優化處理的 Map-Reduce 運算。
(4)Sqoop
Sqoop 是一款開源的工具,主要用于在 Hadoop(Hive)與傳統的資料庫(MySQL、post-gresql等)間進行資料的傳遞,可以将一個關系型資料庫中的資料導入 Hadoop 的 HDFS 中,也可以将 HDFS 的資料導入關系型資料庫中,如圖 1-13 所示。
(5)Flume
Flume 是 Cloudera 提供的一個高可用、高可靠、分布式的海量日志采集、聚合和傳輸的系統,Flume 支援在日志系統中定制各類資料發送方,用于收集資料。同時,Flume 提供對資料進行簡單處理并寫到各種資料接受方(可定制)的能力,如圖 1-14 所示。
(6)Oozie
Oozie 是基于 Hadoop 的排程器,以 XML 的形式寫排程流程,可以排程 Mr、Pig、Hive、shell、jar 任務等。
主要的功能如下。
- Workflow:順序執行流程節點,支援 fork(分支多個節點)、join(将多個節點合并為一個)。
- Coordinator:定時觸發 Workflow。
- Bundle Job:綁定多個 Coordinator。
(7)Chukwa
Chukwa 是一個開源的、用于監控大型分布式系統的資料收集系統。它建構在 Hadoop 的 HDFS 和 MapReduce 架構上,繼承了 Hadoop 的可伸縮性和魯棒性。Chukwa 還包含了一個強大和靈活的工具集,可用于展示、監控和分析已收集的資料。
(8)ZooKeeper
ZooKeeper 是一個開放源碼的分布式應用程式協調服務,是 Google 的 Chubby 一個開源的實作,是 Hadoop 和 Hbase 的重要元件,如圖 1-15 所示。它是一個為分布式應用提供一緻性服務的軟體,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
(9)Avro
Avro 是一個資料序列化的系統。它可以提供:豐富的資料結構類型、快速可壓縮的二進制資料形式、存儲持久資料的檔案容器、遠端過程調用 RPC。
(10)Mahout
Mahout 是 Apache Software Foundation(ASF)旗下的一個開源項目,提供一些可擴充的機器學習領域經典算法的實作,旨在幫助開發人員更加友善快捷地建立智能應用程式。Mahout 包含許多實作,包括聚類、分類、推薦過濾、頻繁子項挖掘。此外,通過使用 Apache Hadoop 庫,可以有效地将 Mahout 擴充到雲中。
1.3 應用場景
Hadoop适用于海量資料、離線資料和負責資料,應用場景如下,
場景1:資料分析,如海量日志分析,商品推薦,使用者行為分析
場景2:離線計算,(異構計算+分布式計算)
場景3:海量資料存儲
2 Storm介紹
from https://www.cnblogs.com/zhaojiankai/p/757617.html
2.1 Storm簡介
Apache Storm是自由開源的分布式實時計算系統,擅長處理海量資料,适用于資料實時處理而非批處理。Storm中核心概念:
Nimbus:Storm叢集主節點,負責資源配置設定和任務排程。我們送出任務和截止任務都是在Nimbus上操作的。一個Storm叢集隻有一個Nimbus節點。
Supervisor:Storm叢集工作節點,接受Nimbus配置設定任務,管理所有Worker。
Worker:工作程序,每個工作程序中都有多個Task。
Task:任務,每個Spout和Bolt都是一個任務,每個任務都是一個線程。
Topology:計算拓撲,包含了應用程式的邏輯。
Stream:消息流,關鍵抽象,是沒有邊界的Tuple序列。
Spout:消息流的源頭,Topology的消息生産者。
Bolt:消息處理單元,可以過濾、聚合、查詢資料庫。
Stream grouping:消息分發政策,一共6種,定義每個Bolt接受何種輸入。
Reliability:可靠性,Storm保證每個Tuple都會被處理。
2.2 Strom架構
Zookeeper叢集在Storm叢集中的作用:
Zookeeper叢集負責Nimbus節點和Supervior節點之間的通信,監控各個節點之間的狀态。比如通常我們送出任務的時候是在Nimbus節點上執行的,Nimbus節點通過zk叢集将任務分發下去,而Supervisor是真正執行任務的地方。Nimbus節點通過zk叢集監控各個Supervisor節點的狀态,當某個Supervisor節點出現故障的時候,Nimbus節點就會通過zk叢集将那個Supervisor節點上的任務重新分發,在其他Supervisor節點上執行。這就意味着Storm叢集也是高可用叢集,如果Nimbus節點出現故障的時候,整個任務并不會停止,但是任務的管理會出現影響,通常這種情況下我們隻需要将Nimbus節點恢複就可以了。Nimbus節點不支援高可用,這也是Storm目前面臨的問題之一。不過一般情況下,Nimbus節點的壓力不大,通常不會出現問題。
一般情況下,Zookeeper叢集的壓力并不大,一般隻需要部署3台就夠了。Zookeeper叢集在Storm叢集中邏輯上是獨立的,但在實際部署的時候,一般會将zk節點部署在Nimbus節點或Supervisor節點上。
storm處理資料的特點:資料源源不斷,不斷處理。
storm中是沒有資料存儲結構的,我們需要自己設計資料落地接口,指明資料存儲到哪一部分中。Storm本身是不存儲資料的。
3 Spark介紹
3.1 Spark簡介
Apache Spark是一個圍繞速度、易用性和複雜分析建構的大資料處理架構,最初在2009年由加州大學伯克利分校的AMPLab開發,并于2010年成為Apache的開源項目之一,與Hadoop和Storm等其他大資料和MapReduce技術相比,Spark有如下優勢:
Spark提供了一個全面、統一的架構用于管理各種有着不同性質(文本資料、圖表資料等)的資料集和資料源(批量資料或實時的流資料)的大資料處理的需求。官方資料介紹Spark可以将Hadoop叢集中的應用在記憶體中的運作速度提升100倍,甚至能夠将應用在磁盤上的運作速度提升10倍
3.2 Spark架構
通常當需要處理的資料量超過了單機尺度(比如我們的計算機有4GB的記憶體,而我們需要處理100GB以上的資料)這時我們可以選擇spark叢集進行計算,有時我們可能需要處理的資料量并不大,但是計算很複雜,需要大量的時間,這時我們也可以選擇利用spark叢集強大的計算資源,并行化地計算,其架構示意圖如下:
- Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是建構在RDD和Spark Core之上的
- Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行互動的API。每個資料庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。
- Spark Streaming:對實時資料流進行處理和控制。Spark Streaming允許程式能夠像普通RDD一樣處理實時資料
- MLlib:一個常用機器學習算法庫,算法被實作為對RDD的Spark操作。這個庫包含可擴充的學習算法,比如分類、回歸等需要對大量資料集進行疊代的操作。
- GraphX:控制圖、并行圖操作和計算的一組算法和工具的集合。GraphX擴充了RDD API,包含控制圖、建立子圖、通路路徑上所有頂點的操作
- Spark架構的組成圖如下:
- Cluster Manager:在standalone模式中即為Master主節點,控制整個叢集,監控worker。在YARN模式中為資料總管
- Worker節點:從節點,負責控制計算節點,啟動Executor或者Driver。
- Driver: 運作Application 的main()函數
- Executor:執行器,是為某個Application運作在worker node上的一個程序
Spark與hadoop:
- Hadoop有兩個核心子產品,分布式存儲子產品HDFS和分布式計算子產品Mapreduce
- spark本身并沒有提供分布式檔案系統,是以spark的分析大多依賴于Hadoop的分布式檔案系統HDFS
- Hadoop的Mapreduce與spark都可以進行資料計算,而相比于Mapreduce,spark的速度更快并且提供的功能更加豐富
關系圖如下:
運作流程及特點:
spark運作流程圖如下:
- 建構Spark Application的運作環境,啟動SparkContext
- SparkContext向資料總管(可以是Standalone,Mesos,Yarn)申請運作Executor資源,并啟動StandaloneExecutorbackend,
- Executor向SparkContext申請Task
- SparkContext将應用程式分發給Executor
- SparkContext建構成DAG圖,将DAG圖分解成Stage、将Taskset發送給Task Scheduler,最後由Task Scheduler将Task發送給Executor運作
- Task在Executor上運作,運作完釋放所有資源
Spark運作特點:
- 每個Application擷取專屬的executor程序,該程序在Application期間一直駐留,并以多線程方式運作Task。這種Application隔離機制是有優勢的,無論是從排程角度看(每個Driver排程他自己的任務),還是從運作角度看(來自不同Application的Task運作在不同JVM中),當然這樣意味着Spark Application不能跨應用程式共享資料,除非将資料寫入外部存儲系統
- Spark與資料總管無關,隻要能夠擷取executor程序,并能保持互相通信就可以了
- 送出SparkContext的Client應該靠近Worker節點(運作Executor的節點),最好是在同一個Rack裡,因為Spark Application運作過程中SparkContext和Executor之間有大量的資訊交換
- Task采用了資料本地性和推測執行的優化機制
常用術語:
Application: Appliction都是指使用者編寫的Spark應用程式,其中包括一個Driver功能的代碼和分布在叢集中多個節點上運作的Executor代碼
Driver: Spark中的Driver即運作上述Application的main函數并建立SparkContext,建立SparkContext的目的是為了準備Spark應用程式的運作環境,在Spark中有SparkContext負責與ClusterManager通信,進行資源申請、任務的配置設定和監控等,當Executor部分運作完畢後,Driver同時負責将SparkContext關閉,通常用SparkContext代表Driver
Executor: 某個Application運作在worker節點上的一個程序, 該程序負責運作某些Task, 并且負責将資料存到記憶體或磁盤上,每個Application都有各自獨立的一批Executor, 在Spark on Yarn模式下,其程序名稱為CoarseGrainedExecutor Backend。一個CoarseGrainedExecutor Backend有且僅有一個Executor對象, 負責将Task包裝成taskRunner,并從線程池中抽取一個空閑線程運作Task, 這個每一個oarseGrainedExecutor Backend能并行運作Task的數量取決與配置設定給它的cpu個數
Cluter Manager:指的是在叢集上擷取資源的外部服務。目前有三種類型
-
- Standalon : spark原生的資源管理,由Master負責資源的配置設定
- Apache Mesos:與hadoop MR相容性良好的一種資源排程架構
- Hadoop Yarn: 主要是指Yarn中的ResourceManager
Worker: 叢集中任何可以運作Application代碼的節點,在Standalone模式中指的是通過slave檔案配置的Worker節點,在Spark on Yarn模式下就是NoteManager節點
Task: 被送到某個Executor上的工作單元,但hadoopMR中的MapTask和ReduceTask概念一樣,是運作Application的基本機關,多個Task組成一個Stage,而Task的排程和管理等是由TaskScheduler負責
Job: 包含多個Task組成的并行計算,往往由Spark Action觸發生成, 一個Application中往往會産生多個Job
Stage: 每個Job會被拆分成多組Task, 作為一個TaskSet, 其名稱為Stage,Stage的劃分和排程是有DAGScheduler來負責的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發生shuffle的地方
DAGScheduler: 根據Job建構基于Stage的DAG(Directed Acyclic Graph有向無環圖),并送出Stage給TASkScheduler。 其劃分Stage的依據是RDD之間的依賴的關系找出開銷最小的排程方法,如下圖
TASKSedulter: 将TaskSET送出給worker運作,每個Executor運作什麼Task就是在此處配置設定的. TaskScheduler維護所有TaskSet,當Executor向Driver發生心跳時,TaskScheduler會根據資源剩餘情況配置設定相應的Task。另外TaskScheduler還維護着所有Task的運作标簽,重試失敗的Task。下圖展示了TaskScheduler的作用
4 Flink介紹
4.1 Flink簡介
Flink核心是一個流式的資料流執行引擎,其針對資料流的分布式計算提供了資料分布、資料通信以及容錯機制等功能。基于流執行引擎,Flink提供了諸多更高抽象層的API以便使用者編寫分布式任務:
DataSet API, 對靜态資料進行批處理操作,将靜态資料抽象成分布式的資料集,使用者可以友善地使用Flink提供的各種操作符對分布式資料集進行處理,支援Java、Scala和Python。
DataStream API,對資料流進行流處理操作,将流式的資料抽象成分布式的資料流,使用者可以友善地對分布式資料流進行各種操作,支援Java和Scala。
Table API,對結構化資料進行查詢操作,将結構化資料抽象成關系表,并通過類SQL的DSL對關系表進行各種查詢操作,支援Java和Scala。
此外,Flink還針對特定的應用領域提供了領域庫,例如:
Flink ML,Flink的機器學習庫,提供了機器學習Pipelines API并實作了多種機器學習算法。
Gelly,Flink的圖計算庫,提供了圖計算的相關API及多種圖計算算法實作。
為什麼我會接觸到 Flink 呢?因為我目前在負責的是監控平台的告警部分,負責采集到的監控資料會直接往 kafka 裡塞,然後告警這邊需要從 kafka topic 裡面實時讀取到監控資料,并将讀取到的監控資料做一些 聚合/轉換/計算 等操作,然後将計算後的結果與告警規則的門檻值進行比較,然後做出相應的告警措施(釘釘群、郵件、短信、電話等)。畫了個簡單的圖如下:
4.2 Flink架構
Flink 是一個開源的分布式流式處理架構:
①提供準确的結果,甚至在出現無序或者延遲加載的資料的情況下。
②它是狀态化的容錯的,同時在維護一次完整的的應用狀态時,能無縫修複錯誤。
③大規模運作,在上千個節點運作時有很好的吞吐量和低延遲。
更早的時候,我們讨論了資料集類型(有界 vs 無窮)和運算模型(批處理 vs 流式)的比對。Flink 的流式計算模型啟用了很多功能特性,如狀态管理,處理無序資料,靈活的視窗,這些功能對于得出無窮資料集的精确結果是很重要的。
除了提供資料驅動的視窗外,Flink還支援基于時間,計數,session等的靈活視窗。視窗能夠靈活的觸發條件定制化進而達到對複雜的流傳輸模式的支援。Flink的視窗使得模拟真實的建立資料的環境成為可能。
Flink的容錯能力事輕量級的,允許系統提供高并發,同時在同一時間提供強一緻性保證。Flink以零資料丢失的方式從故障中恢複,但沒有考慮可靠性和延遲之間的折中。
Flink可以滿足高并發和低延遲(計算大量資料很快)。下圖顯示了Apache Flink與Apache Storm在完成流資料清晰的分布式的性能對比
Flink儲存點提供了一個狀态化的版本機制,使得能以無丢失狀态和最短時間的停機方式更新應用和回退曆史資料。
Flink被設計成用上千個點在大規模叢集上運作。除了支援獨立叢集部署外,Flink還支援YARN和Me'sos方式部署。
Flink的程式内在事并行和分布式的,資料流可以被分區成stram partitions,operators被劃分為operator subtasks;這些subtasks在不同的機器火容器中分不同的西城獨立運作;operator subtasks的數量在具體的operator就是并行計算數,程式不同的operator 階段可能有不同的并行數;如下圖所示,source operator 的并行書為2,但是最後的sink operator 為1;
flink 作業送出架構流程可見下圖:
1、Program Code:我們編寫的 Flink 應用程式代碼
2、Job Client:Job Client 不是 Flink 程式執行的内部部分,但它是任務執行的起點。 Job Client 負責接受使用者的程式代碼,然後建立資料流,将資料流送出給 Job Manager 以便進一步執行。 執行完成後,Job Client 将結果傳回給使用者
、Job Manager:主程序(也稱為作業管理器)協調和管理程式的執行。 它的主要職責包括安排任務,管理checkpoint ,故障恢複等。機器叢集中至少要有一個 master,master 負責排程 task,協調 checkpoints 和容災,高可用設定的話可以有多個 master,但要保證一個是 leader, 其他是 standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三個重要的元件
4、Task Manager:從 Job Manager 處接收需要部署的 Task。Task Manager 是在 JVM 中的一個或多個線程中執行任務的工作節點。 任務執行的并行性由每個 Task Manager 上可用的任務槽決定。 每個任務代表配置設定給任務槽的一組資源。 例如,如果 Task Manager 有四個插槽,那麼它将為每個插槽配置設定 25% 的記憶體。 可以在任務槽中運作一個或多個線程。 同一插槽中的線程共享相同的 JVM。 同一 JVM 中的任務共享 TCP 連接配接和心跳消息。Task Manager 的一個 Slot 代表一個可用線程,該線程具有固定的記憶體,注意 Slot 隻對記憶體隔離,沒有對 CPU 隔離。預設情況下,Flink 允許子任務共享 Slot,即使它們是不同 task 的 subtask,隻要它們來自相同的 job。這種共享可以有更好的資源使用率。
5 各處理方式對比
請參考 https://www.jianshu.com/p/5cc07eae1a0c
謝謝