天天看點

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

開源最大的特征就是開放性,雲生态則讓開源技術更具開放性與創造性,Elastic 與阿裡雲的合作正是開源與雲生态共生共榮的典範。值此合作三周年之際,我們邀請業界資深人士相聚雲端,共話雲上Elasticsearch生态與技術的未來。
Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

本篇内容是力萌資訊資料技術專家李猛帶來的Elasticsearch基于Pipeline視窗函數實作實時聚合計算

分享人:力萌資訊資料技術專家李猛

視訊位址:

https://developer.aliyun.com/live/246153

本文将通過三個部分展開介紹Elasticsearch基于Pipeline視窗函數如何實作實時聚合計算

  • Pipeline實時計算模型
  • ES-Pipeline實時計算能力和工作特點
  • ES+X實時計算暢想

一、 Pipeline實時計算模型

首先,我們來探讨一下Pipeline實時計算模型是什麼。

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

Pipeline翻譯過來是管道的意思,上圖大家可以看到有三根管子,我們在大資料領域或者在做應用系統的時候,其實程式設計抽象來說就是這三點:輸入資料,process處理,輸出。如果業務程式或者大資料邏輯比較複雜,那麼輸出就會成為下一個管道的輸入,是以就會到第二根管子;第二個管子處理完之後又會到下一根管子,也就是不停地在寫很多複雜的資料處理邏輯。其實早期很多同學去做大資料開發,Map-reduce如果把中間的資料描述出來會發現也是這三步。Map- reduce-map-reduce的循環,中間的資料一直在,也是按照這種管道的模型不斷地在變化。其實最早Pipeline的思維是來自于早期接觸到Elastic公司推出的ELK三件套,其中采集資料的程式設計的模型配置裡面有三個步驟其實就應對了這個模型,在程式設計裡面可以input,然後可以接受上一次處理的邏輯,這就是管道。

Streaming計算模型

我們在大資料領域經常會談到一個概念叫流式計算,而Pipeline這個模型無論是做實時計算還是離線計算,其實思維都是Streaming的計算模型。

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

如圖,DStream資料從上遊進行到下遊,是按照追加這種方式的一種流計算,我們要去從中提取資料進行一些模型和處理。而在SPARK領域,它目前的流計算隻支援微批處理這個概念,但這并不影響我們管道實施的思維模型。

Streaming視窗計算模型

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

在Spark中我們又可以做一個視窗計算模型,建立一個window,設定一個1-2分鐘的時間視窗,在時序的資料處理場景确實很合适,比如使用spark streaming等等。當然有很多資料場景上它并不是時序的,這個時候我們要基于别的邏輯去做這種資料的視窗計算,我最早的視窗思維也是來自于Spark,大概四五年前系統地學習Spark的時候,第一次感受到window其實就是我們傳統地做資料統計的時候設定的一個時間範圍,隻是換了一個計算引擎。以前我們是在資料庫裡標明一個時間視窗,在Spark裡我們是基于一個記憶體模式把資料丢進去,然後在這個範圍内計算。

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

目前在實時計算領域,Flink絕對是領先的。大家可以看到,在Flink計算體系裡,中間是Flink的計算引擎,它的上遊可以支援很多日志,IOT,Clicks,real-time,離線型的Base事件,中間它會通過一些管道處理,這些管道本身就可以支援互相的穿插,通過管道把一些資料處理完後輸出到另外一個管道,做下一步的資料。如果資料最終沒有再繼續處理,就把這個資料交換出去,存到某一個系統裡面。

圖中大家可以看到,用Flink做實時計算至少需要三點:

第一是上遊的資料輸入源input,中間就需要用flink,當然也可以選擇自己寫入程式,如果簡單的話是沒有任何問題的,或者是用logstash等等。下遊的輸出端也有很多,可以輸到下一個應用程式裡面,可能是另外一組的Flink或其他的的系統,也可以輸入到Event Log,也可以輸出到最終的資料庫裡面。但我們想講的Pipeline并不是這個意思,這隻是我的一個思維起源,Flink這個架構産品幫我解決了很多排程的問題。

Q:現有流計算的問題是什麼?

在标準的實時計算的領域,至少需要三個以上的步驟。

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

第一,現在基本上大部分企業都會選擇Kafka ,中間選擇Flink,下遊根據應用程式邏輯有可能還需要下一步的處理,會放到Kafka,也可能最終不需要處理,需要給Elasticsearch做最終查詢。目前來說Elasticsearch在資料查詢方面是領先的,在這個領域裡是最好用的。我們在做一個标準的實時流計算的時候,基于Pipeline會需要輸入-處理-輸出,也就是去搭建一個哪怕是輕量級的實時資料處理計算都需要融入至少三個産品,會帶來什麼問題?大家可以想象,在IT裡做系統架構等等,每增加一個處理的環節,增加一個節點,或者每增加一個大型的資料系統融入進來,這個系統的複雜性就會增加好幾倍,并且資料架構的可靠性也會降低。同時,這三個産品對于研發人員和對架構師的考驗非常大,不能保證很快就可以完成。

舉個例子,我們一般資料計算之後,會用Elasticsearch去存儲,而Elasticsearch也有多方面的配置,索引的建立,叢集的搭建等等又回到一個現實的問題,就是資料庫是否需要一個DBA,上面有什麼業務在并行,是以每一次當我們融入新的技術都會遇到問題。是以今天探讨的是,雖然Pipeline計算模型非常好,但是現在的實時計算的思維是有問題的,一些産品并不能簡化我們的操作。

二、 ES Pipeline實時計算能力

ES基于它已有的特性,為了避免上面三件套的問題,在Pipeline上做了一些工作。

1、ES Ingest Pipeline

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

Elasticsearch推出了Ingest功能,即資料處理。圖中大家可以看到,上遊資料用任何輸入源都是可以的,然後經過Elasticsearch裡Ingest這個具備處理能力的節點,讓你的資料經過Pipeline管道的處理,可能需要拆分資料做一些複雜的計算。比如本來傳入一個性别,需要把性别轉換成矩陣,把男和女轉化為零和一這樣的差別,然後就可以在這裡面去程式設計,完成之後資料直接輸出到一個索引裡。圖中大家可以看到,相比之前的基于标準三件套的實時處理要簡化了很多,用一個産品可以替代之前的幾個産品。可以把Kafka,Flink的邏輯合并掉,全部用Elasticsearch來替代,就更加便利。是以如果你對ES有興趣或是熟知的話,可以大規模地使用它,它也有很多缺點,但是它非常的實用,用一個技術替代了三個技術,從圖中可以看到它其實也是完全符合Pipeline聚合模型的。

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

Ingest裡提供了很多pipeline 的函數,其中有一個功能叫 enrich。當資料輸入後,ES的索引可以做一些反查比對,然後再把資料豐富一下。相當于資料進來後要去查一下資料庫,然後把一些附屬的屬性綁定,最終塞到另外一個索引裡,其實這也是一個标準的管道。是以在現實世界裡管道程式設計的這種思維其實早以深入人心,并廣泛應用。右圖是來自于一個标準的函數,這個函數裡還有一個用來給原資料增加一些字段的processor,這個是ES的一個特性。 對于這些特性不用去刻意地記錄,當你認可我們這種編制思維模型的時候就可以去學習一下。

2、ES Rollup Pipeline

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

Rollup,即資料上卷,圖中左邊大家看到資料有三種顔色,這些資料表示的是一個點選事件,記錄某一些使用者點選的事件,性别。在原始的資料裡,我們可以想象,假設做一個網站或系統,pv、uv的統計,每天可能有上10億的資料量,但是我們分析需求的時候實際上隻需要分析今天什麼時候哪個域名點選了多少次,這個我們可以通過Rollup這個概念把資料做一次轉換,壓縮。到右邊,我們的資料就已經精确到天或者按小時。對于ES來說,它其實也是遵循三個關鍵步驟:第一,原始資料輸入到Elasticsearch裡存起來,中間開啟一個Rollup實時計算的能力,然後把資料經過一定的折疊之後,輸出到另外一個索引,這樣就完全滿足了Pipeline的思維,也完全滿足了實時計算。是以在這個領域, ES算做了一個偉大的創新,隻需要一套去處理,就可以把管道模型深切地融入到自己的資料裡。如果基于Flink做資料統計,上遊會先用Kafka輸入資料,然後中間用Flink計算,比如每1000條,每1萬條資料就Rollup一下,然後輸出到另外一個下遊又會放到ES裡來存儲,這就會帶來技術的成本,實施的代價。

我們用顔色解決了海量明細資料的查詢,但是現在又有一些隔夜的統計需求,比如定期的,該如何去做?正常來說,有些大資料的專家給你建議可能會需要把資料取出來,這個其實并不是很好,是以這個時候如果你用的是Elasticsearch這一套件,可以自己建立一個Rollup節點輸到另外一個就完成了。帶來的經濟效益不言而喻,而且功能特性也非常簡單。

3、ES Transform Pipeline

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

Rollup其實是基于時間的次元,那麼如果資料并沒有時間,想要做Pipeline轉換的處理也是一樣的。在ES裡推出了功能叫Transform Pipeline的模型,同樣可以解決這種問題,隻是統計方式略有不同。在Transform裡使用者也可以編輯很多自己的函數,也可以寫一些複雜的腳本。

4、ES Aggregations Pipeline

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

接下來講到ES強大的聚合能力,聚合其實是做統計,比如1000條資料要做一個SUM的值,或者AVG,Min ,Max等等,這些就叫Aggregation。在Elasticsearch 裡其實是會根據現實世界融入自己的一些邏輯。圖中我們可以看到,聚合一共分成了兩次,初次聚合基于原始索引輸入做一次聚合,聚合之後壓縮,比如原來有一億條資料,聚合出來大概的結果就隻有1萬,然後再基于這1萬左右的資料又要做二次聚合。在ES裡這個思維就叫Pipeline,可以在官方網站搜尋到叫Elastic Pipeline的聚合。二次聚合之後,ES把第一次和第二次聚合的結果一起推給應用端,這個特性是其他很多資料産品沒有的。比如原本資料庫裡的SQL是做不到的,其他的一些大型産品可能會用到,但是複雜程度極高,是以比較推薦ES這個功能。

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

在Elastic裡,Pipeline的視窗函數其實提供了很多方式,比如Aggregation目前提供了至少圖中這三種方式。比如可以選擇moving avg計算移動的平均值,通過自定義函數moving fn寫自定義腳本等等。

以上就是基于Pipeline實作視窗計算的一些現實的問題和邏輯,還有ES的一些特性。

三、 ES+X實時計算暢談

(一)資料程式設計本質

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

現實世界中,寫一個函數,做一個系統,會發現這個系統提供對外一些API輸入資料,然後系統做一些邏輯,邏輯之後又輸出,可能輸出到下一個處理環節,如果是資料庫就存在資料庫裡面。輸入的地方也是一樣,假設是一個rest API,接受外圍的請求,處理,輸出,傳回資料,假如是一個大資料裡面的離線計算批處理,也是寫一個批處理邏輯輸入資料,比如Spark讀取檔案,然後做計算,輸出,可能又存到輸入上去了,或者是輸到ES。如果這一步完成了,我們在大資料裡面經常還要做排程編排,一般會用Airflow去完成,在編寫Airflow流程的時候,實際上是編寫一個一個流的方式,然後把它挪到下面去,再輸入到下一個邏輯。

(二)流式實時計算資料庫

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

雖然ES也具備一些實時的處理能力,但它有很明顯的局限性。比如它不支援非常精确毫秒級的處理。抛開這個限制,我希望有一種資料産品既可以充當Kafka的角色,也能支援流式資料進來。我可以編寫簡單的SQL處理函數在上面做一些計算,經過Pipeline可以支援很多函數,同時這些資料又可以再回到資料庫裡去消化掉,這個資料庫同時又可以對外提供查詢或者其他的能力。其實我想探讨的就是這個思維,all in one,我們需要一個叫做Streaming Database的概念,希望有一個流式資料庫,把資料的in和process,還有資料的output全部融在一起,讓整個程式設計模型變得簡單高效。相比之下之前所提的三件套就太過于标準化,過于陳舊,雖然它很吸引人,很優秀。

(三)Elasticsearch + X暢談

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

我希望ES可以支援這三種門類,其實它已經支援了兩種,就是資料的process裡有in,transformer,還有aggregation,還有一些其他的函數能力。如果能把上遊的流式解決,我相信未來它應該可以占領更多的流式的處理市場。

其實ES本身是支援索引的,如果把這個索引充當一個Buffer流就會面臨,資料可以寫進來但是讀資料就比較麻煩,因為它的資料在順序上沒有得到很好的控制,資料可以從頭寫到尾,但我們無法像Kafka一樣從尾讀到頭,這就是ES還需要改進的地方。是以希望未來市場上有更多Streaming産品,因為現在它已經具備了Bash批處理模拟,如果它未來加入了,我相信在ES的市場上絕對能夠很好地發展。比如把Kafka的節點融進來;把Kafka的特性、具備的能力加入到ES裡面來;建立索引的時候直接就可以建立一個隊列的索引;寫資料的時候可以從頭寫,讀的時候可以從底部讀,不用按照标準的索引search去查了。然後經過process節點的時候,processor具備了Flink的處理能力和邏輯能力,最終輸出到索引。

這就是我想和大家探讨的一個全新思維,雖然标題講的是基于視窗的,但其實視窗計算在整個大資料裡隻是占了很小的一點,從high level這個層次來看,實時計算流法還是本質。

Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算

阿裡雲Elastic Stack

】100%相容開源ES,獨有9大能力,提供免費 X-pack服務(單節點價值$6000)

相關活動

更多折扣活動,請

通路阿裡雲 Elasticsearch 官網 阿裡雲 Elasticsearch 商業通用版,1核2G ,SSD 20G首月免費 阿裡雲 Logstash 2核4G首月免費 下載下傳白皮書:Elasticsearch 八大經典場景應用
Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算
Elasticsearch生态&技術峰會 | Elasticsearch基于Pipeline視窗函數實作實時聚合計算