在流式計算中,資料是持續不斷來的,有時候我們要對一些資料做跨周期(duration)的統計,這個時候就不得不維護狀态了。而狀态管理對spark 的 rdd模型是個挑戰,因為在spark裡,任何資料集都需要通過rdd來呈現,而rdd 的定義是一個不變的分布式集合。在狀态管理中,比如spark streaming中的word-count 就涉及到更新原有的記錄,比如在batch 1 中 a 出現1次,batch 2中出現3次,則總共出現了4次。這裡就有兩種實作:
擷取batch 1 中的 狀态rdd 和目前的batch rdd 做co-group 得到一個新的狀态rdd。這種方式完美的契合了rdd的不變性,但是對性能卻會有比較大的影響,因為需要對所有資料做處理,計算量和資料集大小是成線性相關的。這個我們後續會詳細讨論。
第二種是一種變通的實作。因為沒法變更rdd/partition等核心概念,是以spark streaming在集合元素上做了文章,定義了mapwithstaterdd,将該rdd的元素做了限定,必須是mapwithstaterddrecord 這個東西。該mapwithstaterddrecord 保持有某個分區的所有key的狀态(通過statemap記錄)以及計算結果(mappeddata),元素mapwithstaterddrecord 變成可變的,但是rdd 依然是不變的。
這兩個方案分别對應了 updatestatebykey/mapwithstate 的實作。
前言
parquet性能
<a href="https://yq.aliyun.com/articles/60199">自動記憶體管理模型</a>
<a href="https://yq.aliyun.com/articles/60245">流式狀态管理10倍性能提升</a>
之前就想系統的對這三塊仔細闡述下。現在總算有了第二篇。
本文會從三個方面展開:
updatestatebykey的實作;
mapwithstate(1.6新引入的流式狀态管理)的實作
mapwithstate額外内容
在 關于狀态管理中,我們已經描述了一個大概。該方法可以在org.apache.spark.streaming.dstream.pairdstreamfunctions中找到。調用該方法後會建構出一個org.apache.spark.streaming.dstream.statedstream對象。計算的方式也較為簡單,核心邏輯是下面兩行代碼:
首先将prevstaterdd 和 parentrdd(新batch 的資料) 做一次cogroup,形成了 (k, seq[v], seq[w]) 這樣的結果集。你會發現和updatestatebykey 要求的(seq[v], option[s])簽名還是有些類似的。事實上這裡的seq[v] 就是parentrdd的對應k 的新的值。為了适配他兩,spark 内部會對你傳進來的updatefunc 做兩次轉換,進而使得你的函數能夠接受(k, seq[v], seq[w])這樣的參數。看到這,想必你也就知道為啥updatestatebykey 接受的函數簽名是那樣的了。
前文我們提到,這樣做很漂亮,代碼也少,契合rdd的概念,然而你會發現無論parentrdd裡有多少key,哪怕是隻有一個,也需要對原有所有的資料做cogroup 并且全部做一遍處理(也就是應用你的update函數)。顯然這是很低效的。很多場景下,新的batch 裡隻有一小部分資料,但是我們卻不得不對所有的資料都進行計算。
可以為key 設定ttl(timeout)
使用者可以對傳回值進行控制
前面我們提到,在新的mapwithstate api 中,核心思路是建立一個新的mapwithstaterdd,該rdd的元素是 mapwithstaterddrecord,每個mapwithstaterddrecord 記錄某個partiton下所有key的state。
依然的,你在org.apache.spark.streaming.dstream.pairdstreamfunctions 可以看到mapwithstate 簽名。
這一段代碼有三點值得注意:
該接口在1.6 中還是 experimental 狀态
接受的不是一函數,而是一個statespec 的對象。
傳回了一個新的dstream
其實statespec 隻是一個包裹,你在實際操作上依然是定義一個函數,然後通過statespec進行包裹一下。以 wordcount 為例:
接着statespec.function(mappingfunc) 包裹一下就可以傳遞給mapwithstate。我們看到該函數更加清晰,word 是k,one新值,state 是原始值(本batch之前的狀态值)。這裡你需要把state 更新為新值,該實作是做了一個内部狀态維護的,不像updatestatebykey一樣,一切都是現算的。
mapwithstatedstreamimpl 的compute邏輯都委托給了internalmapwithstatedstream,最終要得到mapwithstaterdd,基本是通過下面的邏輯來計算的:
這裡有個很重要的操作是對datardd進行了partition操作,保證和prevstaterdd 按相同的分區規則進行分區。這個在後面做計算時有用。
擷取到prevstaterdd,接着擷取目前batch的資料的rdd,最後組裝成一個新的mapwithstaterdd。mapwithstaterdd 還接受你定義的函數mappingfunction以及key的逾時時間。
其中mapwithstaterdd 和别的rdd 不同之處在于rdd裡的元素是mapwithstaterddrecord 對象。其實prevstaterdd 也是個mapwithstaterdd 。
整個實際計算邏輯都在mapwithstaterddrecord.updaterecordwithdata 方法裡。
前面我們提到,mapwithstaterddrecord 是prevstaterdd 裡的元素。有多少個分區,就有多少個mapwithstaterddrecord 。一個record 對應一個分區下所有資料的狀态。在mapwithstaterddrecord.updaterecordwithdata 方法中,第一步是copy 目前record 的狀态。這個copy是非常快的。我們會在mapwithsate額外内容 那個章節有更詳細的分析。
接着定義了兩個變量,其中mappeddata 會作為最後的計算結果傳回,wrappedstate 類似hadoop裡的 text,你可以不斷給它指派,然後獲得一些新的功能,避免傳回建立對象。它主要是給state添加了一些方法,比如update,define狀态等。
接着周遊目前batch 所有的資料,并且應用使用者定義的函數。這裡我們看到,我們隻對目前batch的資料進行函數計算,而不是針對曆史全集資料進行計算,這是一個很大的性能提升點。接着根據wrappedstate的狀态對newstatemap做更新,主要是删除或者資料的更新。最後将新的結果傳回并且放到mappeddata 。
上面這段邏輯,你會發現一個問題,如果dataiterator 裡有重複的資料比如某個k 出現多次,則mappeddata也會有多次。以wordcount 為例:

hello 出現了三次,是以會加入到mappeddata中三次。其實我沒發現這麼做的意義,并且我認為會對記憶體占用造成一定的壓力。
如果你想要最後的結果,需要調用完mapwithstate 之後需要再調用一次statesnapshots,就可以拿到第三欄的計算結果了。
經過上面的計算,我們對parentrdd裡的每個分區進行計算,得到了mappeddata以及newstatemap,這兩個對象一起建構出mapwithstaterddrecord,而該record 則形成一個partition,最後構成新的mapwithstaterdd。
mapwithstaterddrecord 透過statemap 維護了某個分區下所有key的目前狀态。 在前面的分析中,我們第一步便是clone old statemap。如果集合非常大,拷貝也是很費時才對,而且還耗費記憶體。
是以如何實作好statemap 變得非常重要:
實作過程采用的是 增量copy。也叫deltamap。 新建立的statemap 會引用舊的statemap。新增資料會放到新的statemap中,而更新,删除,查找等操作則有可能發生在老得statemap上。缺點也是有的,如果statemap 鍊路太長,則可能會對性能造成一定的影響。我們隻要在特定條件下做合并即可。目前是超過delta_chain_length_threshold=20 時會做合并。
使用 org.apache.spark.util.collection.openhashmap,該實作比java.util.hashmap 快5倍,并且占用更少的記憶體空間。不過該hashmap 無法進行删除操作。