我記得有一次到一家公司做内部分享,然後有研發問我,即席分析這塊,他們用es遇到一些問題。我當時直接就否了,我說es還是個全文檢索引擎,如果要做分析,還是應該用impala,phenix等這種主打分析的産品。随着es的發展,我現在對它的看法,也有了比較大的變化。而且我認為es+spark sql組合可以很好的增強即席分析能夠處理的資料規模,并且能夠實作複雜的邏輯,獲得較好的易用性。
需要說明的是,我對這塊現階段的了解也還是比較淺。問題肯定有不少,歡迎指正。
lucene 有三個比較核心的概念:
反向索引
fielddata/docvalue
collector
反向索引不用我講了,就是term -> doclist的映射。
fielddata/docvalue 你可以簡單了解為列式存儲,索引檔案的所有文檔的某個字段會被單獨存儲起來。 對于這塊,lucene 經曆了兩階段的發展。第一階段是fielddata ,查詢時從反向索引反向構成doc-term。這裡面有兩個問題:
資料需要全部加載到記憶體
第一次建構會很慢
這兩個問題其實會衍生出很多問題:最嚴重的自然是記憶體問題。是以lucene後面搞了docvalue,在建構索引的時候就生成這個檔案。docvalue可以充分利用作業系統的緩存功能,如果作業系統cache住了,則速度和記憶體通路是一樣的。
另外就是collector的概念,es的各個aggregator 實作都是基于collector做的。我覺得你可以簡單的了解為一個疊代器就好,所有的候選集都會調用collector.collect(doc)方法,這裡collect == iterate 可能會更容易了解些。
es 能把聚合做快,得益于這兩個資料結構,一個疊代器。我們大部分聚合功能,其實都是在fielddata/docvalue 上工作的。
aggregations種類分為:
metrics
bucket
metrics 是簡單的對過濾出來的資料集進行avg,max等操作,是一個單一的數值。
bucket 你則可以了解為将過濾出來的資料集按條件分成多個小資料集,然後metrics會分别作用在這些小資料集上。
對于最後聚合出來的結果,其實我們還希望能進一步做處理,是以有了pipline aggregations,其實就是組合一堆的aggregations 對已經聚合出來的結果再做處理。
aggregations 類設計
下面是一個聚合的例子:
其語義類似這個sql 語句: select count(*) as user_count group by user order by user_count desc。
對于aggregations 的解析,基本是順着下面的路徑分析:
在實際的一次query裡,要做如下幾個階段:
query phase 此時 會調用globalordinalsstringtermsaggregator的collector 根據user 的不同進行計數。
rescorephase
suggestphase
aggregationphase 在該階段會會執行實際的aggregation build, aggregator.buildaggregation(0),也就是一個特定shard(分片)的聚合結果
mergephase。這一步是由接受到請求的es來完成,具體負責執行merge(reduce)操作searchphasecontroller.merge。這一步因為會從不同的分片拿到資料再做reduce,也是一個記憶體消耗點。是以很多人會專門搞出幾台es來做這個工作,其實就是es的client模式,不存資料,隻做接口響應。
在這裡我們我們可以抽取出幾個比較核心的概念:
aggregatorfactory (生成對應的aggregator)
aggregation (聚合的結果輸出)
aggregator (聚合邏輯實作)
另外值得注意的,pipeline aggregator 我前面提到了,其實是對已經生成的aggregations重新做加工,這個工作是隻能單機完成的,會放在請求的接收端執行。
前面的例子提到,在query 階段,其實就會調用aggregator 的collect 方法,對所有符合查詢條件的文檔集都會計算一遍,這裡我們涉及到幾個對象:
doc id
field (docvalue)
intarray 對象
collect 過程中會得到 doc id,然後拿着docid 到 docvalue裡去拿到field的值(一般而言字元串也會被編碼成int類型的),然後放到intarray 進行計數。如果多個doc id 在某filed裡的字段是相同的,則會遞增計數。這樣就實作了group by 的功能了。
我之前一直在想這個問題,後面看了下es-hadoop的文檔,發現自己有些思路和現在es-hadoop的實作不謀而合。主要有幾點:
spark-sql 的 where 語句全部(或者部分)下沉到 es裡進行執行,依賴于反向索引,docvalues,以及分片,并行化執行,es能夠獲得比spark-sql更優秀的響應時間
其他部分包括分片資料merge(reduce操作,spark 可以獲得更好的性能和分布式能力),更複雜的業務邏輯都交給spark-sql (此時資料規模已經小非常多了),并且可以做各種自定義擴充,通過udf等函數
es 無需實作merge操作,可以減輕記憶體負擔,提升并行merge的效率(并且現階段似乎es的reduce是隻能在單個執行個體裡完成)