天天看點

在雲上搭建大規模實時資料流處理系統

在大資料時代,資料規模變得越來越大。由于資料的增長速度和非結構化的特性,常用的軟硬體工具已無法在使用者可容忍的時間内對資料進行采集、管理和處理。本文主要介紹如何在阿裡雲上使用Kafka和Storm搭建大規模消息分發和實時資料流處理系統,以及這個過程中主要遭遇的一些挑戰。實踐主要立足建立一套汽車狀态實時監控系統,可以在阿裡雲上立即進行部署。

大資料時代,随着可擷取資料的管道增多,比如常見的電子商務、網絡、傳感器的資料流、太空資料等,資料規模也變得越來越大;同時,不同的管道往往産生更多的資料類型,這些衍生的資料增長非常之快,規模非常之大。大資料時代各個機構可謂是坐擁金山,然而目前大資料技術的應用卻仍然存在衆多挑戰,主要出現在資料收集、存儲、處理和可視化幾個過程。

圖1是Storm的一個簡單的架構。

<b>圖1  Storm架構</b>

Apache Kafka也是一個開源的系統,旨在提供一個統一的,高吞吐、低延遲的分布式消息處理平台來對實時資料進行處理。它最早由LinkedIn開發,開源于2011年并被貢獻給了Apache。Kafka差別于傳統RabbitMQ、Apache ActiveMQ等消息系統的地方主要在于:分布式系統特性,易于擴充;為釋出和訂閱提供高吞吐量;支援多訂閱,可以自動平衡消費者;可以将消息持久化到磁盤,可以用于批量消費,例如ETL等。

<b>圖2  Kafka架構</b>

我們需要設計一個實時車輛監控系統,這個系統要将汽車駕駛過程中實時的位置,速度,轉速,油耗以及轉速發送到系統中,進而可以實時計算出車流量和污染物排放量。該系統的目标是要能同僚支援10萬輛車同時發送消息,在最高峰能滿足100萬輛車。為了實作如此規模的消息分發和吞吐,我們基于Kafka和Storm來設計實作。同時為了滿足高擴充性,我們将Storm和Kafka分别部署到不同的伺服器上,如果需要更多的計算能力,可以随時通過建立新的伺服器的方式來完成。此外為了滿足高可用性,每台相同功能的伺服器也需要至少部署2台,這樣一旦一台伺服器出現問題,另外一台伺服器也可以持續提供服務。

在實體伺服器上部署Storm和Kafka等系統涉及到大量伺服器叢集和軟體的安裝部署,這個過程需要花費大量時間,而雲計算則很好的彌補了這一點——提供各種虛拟伺服器和鏡像功能,加快基礎設施和軟體的部署過程。

<b></b>

<b>圖3  車聯網監控系統架構</b>

我們需要2台伺服器來建構Kafka代理伺服器,在Storm中還需要2台伺服器來運作Spout和2個Bolt,另外在Redis層則需要2台伺服器來部署緩存,再加上2台伺服器作為Web伺服器。伺服器架構圖如圖4所示。

<b>圖4  車聯網監控系統架構</b>

在部署車聯網監控系統之前,我們首先需要在每台伺服器上部署相應的軟體,包括Git、Libzmq、Java、G++等,用于代碼編譯和相關軟體安裝。可以使用SSH連接配接到相應的機器。使用者名密碼則會由阿裡雲以郵件或者短消息的方式提供。

在車聯網實時監控系統中,我們需要部署4種不同類型的伺服器,分别是網站前台伺服器、Kafka伺服器、Storm伺服器和緩存伺服器,以滿足上面提到的高擴充性的要求。在每一種類型的伺服器部署完成之後,都可以通過阿裡雲鏡像的功能,建立一個能随時使用的鏡像,這樣在擴充伺服器的時候就不需要重新安裝軟體,直接通過鏡像建立伺服器就可以了。

以下指令需要在所有伺服器上運作以安裝相應的軟體:

以下指令安裝在緩存伺服器和Kafka伺服器上:

另外,我們還需要在Storm的伺服器安裝maven和lein用于代碼編譯:

在Kafka伺服器上安裝Kafka:

對于Storm和Kafka的安裝,到這一步已基本完成,接下去需要分别建立鏡像。建立鏡像的方法是先建立阿裡雲快照,然後通過将快照轉換為鏡像的方式完成。具體步驟如下:

在阿裡雲的管理界面選擇雲伺服器,随後選擇該伺服器的磁盤清單,點選建立快照。

輸入快照名稱并确認。

阿裡雲會自動為雲伺服器的系統盤建立快照,當建立完成以後,會出現“建立自定義鏡像”按鈕。

點選“建立自定義鏡像”的按鈕,阿裡雲就會将這個快照轉換為鏡像,可以在阿裡雲ECS管理界面的自定義鏡像欄中看到。

<b>圖5  自定義的鏡像</b>

接下來,我們通過鏡像可以直接建立相同配置的ECS伺服器。

<b>圖6  從自定義鏡像中建立雲伺服器</b>

當然,在自動擴充實作上,雲服務并不需要使用者去手動執行,這裡我們使用阿裡雲的ECS REST API自動通過鏡像建立機器。可以參考以下Python代碼,自動建立阿裡雲ECS虛拟機:

<b>基于</b><b>Storm</b><b>和</b><b>Kafka</b><b>的車輛資訊實時監控系統打造</b>

接下來做的就是将車輛資訊實時監控系統部署到系統中。這個系統示範了如何編寫一個Storm的Topology,從Kafka消息系統中将資訊讀取出來。我們使用Kafka的用戶端模拟從世界各地發送車輛實時資訊給Kafka叢集,然後Storm Topology會把這些消息通過Bolts将坐标轉換為Json對象,并且使用GeoJSON在Bing Map上顯示車輛的實時位置、溫度、轉速以及速度等等資訊。Topology還會将資訊寫到Redis緩存中,然後Node.js通過socket.io讀取Redis中的資訊,并且使用d3js顯示在頁面上。

首先,我們需要編寫Kafka 生産者的部分代碼,主要是模拟讀取汽車的實時資料并向Kafka叢集進行發送,我們實作了一個KafkaCarDataProducer類,通過配置ProducerConfig來建立一個Producer對象來發送資料。它可以用來連接配接到Zookeeper,或者直接是Kafka 代理。例如:kafkaclient.cloudapp.net:2181或者0:kafkaclient.cloudapp.net:9092。代碼中我們根據不同的連接配接字元串設定不同配置。僞代碼如下:

然後就可以直接通過下面代碼來發送消息:

接下來我們需要編寫3個Storm類,首先是建立Storm的Topology,這個類叫KafkaCarTopology,我們建立了一個叫car的topic,然後定義本機一個hosts和Zookeeper hosts,最後建立一個Spout,叫做KafkaSpout,然後添加ParseCarDataBolt連接配接到KafkaSout,再建立一個RedisCarBolt,用于将結果寫入Redis緩存。最後根據參數建立3個Worker,送出Storm Topology。

在這個拓撲結構中,我們有2個Bolt用于資料的處理,第一個叫ParserCarDataBolt,這個Bolt主要将Kafka傳出的消息轉換為Json格式,它繼承BaseBasicBolt,在execute函數中通過collector送出資料,同時重載了declareOutputFields函數,通知下一個Bolt的資料格式。代碼如下:

資料會被寫入RedisCarBolt,再寫入到Redis緩存中。它繼承自BaseRichBolt,需要重載prepare和excute方法來處理消息元組。此外還需要重載prepare和cleanup函數,幾個關鍵的函數如下:

最後我們還需要編寫一些Node.js的代碼,保證在頁面上通過socket.io進行通訊,實時将最終資料從Redis裡面讀取出來,并在BingMap上顯示。

到此為止,一個簡單的車輛資訊實時監控系統就實作了,我們通過bash腳本進行編譯,并安裝到相應的伺服器上,比如下列代碼需要被安裝在Storm的伺服器上:

有一點需要注意的是,由于在編譯過程中需要自動下載下傳Storm庫,在阿裡雲的國内機房的虛拟機很有可能需要設定代理進行。設定代理的方法也很簡單,通過對lein指令增加以下參數就可以了:http_proxy=http://URL:PORT

接着我們在網頁上通路http://webhostname或者運作node.js的伺服器,就會看到下面的網頁,同時發現網頁将同步重新整理汽車的實時位置、速度、轉速等。

<b>圖7  車聯網監控系統示範頁面</b>

接下來我們對這個系統進行了一個簡單的吞吐量測試。我們隻有1個Topic,使用5個partition、3個worker、1個Spout和2個Bolt,在一台2核2GB的ECS上運作。我們使用了另外4台用戶端,每個用戶端有4核8G記憶體,分别啟動40個線程不斷向這個系統實時發送汽車資訊,模拟160台汽車發送的情況,其消息發送數量和CPU占用率情況如圖8所示。

<b>圖8  車聯網監控系統性能分析</b>

從圖8中可以看出,平均每輛汽車用戶端會模拟每秒給系統發送了1000條消息,總的吞吐量達到16萬條左右,此時平均的CPU占用率大約在30%左右。如果系統是完全線性的,在系統CPU占用率達到90%的情況下,大約能處理48萬條消息。不過實際情況中,在阿裡雲ECS上,卻發現CPU達到50%以後,就不再上升,而用戶端發送消息的延時也逐漸增加。

經過分析以後發現,由于ECS的磁盤性能無法和實體機的SSD磁盤相比,是以在Kafka消息大量寫入磁盤的過程中,吞吐量下降,磁盤讀寫負擔變得非常大。這時我們增加了Kafka的Broker和Storm的Spout的數量,将消息分布式地分發到多台ECS上,進而實作了消息吞吐量的線性增加。

在這個系統中,我們不推薦使用大核和大記憶體的機器,而推薦使用多台2核2GB的伺服器分布式地處理消息。這也是雲計算處理大資料的原則所在,使用橫向擴充而不用縱向擴充。

至此我們介紹了利用Storm和Kafka實作大資料的實時處理,并且介紹了如何在雲上通過鏡像快速地建立這套系統。此外,我們還介紹了如何對Storm、Kafka、Redis以及Node.js開發出一個實時的車輛資訊監控系統。這個系統能夠實作高性能、大吞吐量和高并發。當然,随着大資料的快速發展,我們相信還會有越來越多好的工具和産品出現在市場上,到那時我們從大資料中擷取有效的資訊将會變得更加容易和便捷。有了雲計算的幫助,開發的周期也會變得越來越短。

繼續閱讀