現在越來越多的技術架構下會組合使用maxcompute和tablestore,用maxcompute作大資料分析,計算的結果會導出到tablestore提供線上通路。maxcompute提供海量資料計算的能力,而tablestore提供海量資料高并發低延遲讀寫的能力。
将maxcompute内資料導出至tablestore,目前可選的幾種主要途徑包括:
自己編寫工具:使用maxcompute sdk通過tunnel讀取表資料,再通過tablestore sdk再寫入資料。
其中第二種是我們最常推薦給使用者做臨時的資料導出使用的,如果沒有需要對資料做特殊處理的需求,我們一般不推薦第一種途徑。
datax在阿裡集團内部已經應用了很多年,經曆了多次雙十一的考驗,是一個穩定、易用、高效的工具。随着maxcompute上結果資料越來越龐大,資料導出的速率越來越被看重,海量的資料需要在基線内完成導出。本篇文章,主要會介紹幾種優化手段,以提高使用datax來進行maxcompute向tablestore資料導出的吞吐量。
優化過程
我們會以實際的場景,來示範如何通過一步步的優化,提升資料導出的速度。在資料導出的整個鍊路上,主要有三個環節,一是maxcompute資料通道的讀,二是datax的資料交換,三是tablestore的線上寫,這三個環節任意一個成為瓶頸,都會影響導出的速度。
maxcompute資料通道的讀的性能比較高,一般不會成為瓶頸,本文主要是針對後兩個環節來優化。優化的核心指導方針就是:1. 提高并發,2. 降低寫入延遲。接下來列舉的幾種優化手段,也是圍繞這兩點,來不斷進行優化。
實驗選擇使用tablestore的測試環境,在maxcompute上,我們會建立一張表并準備1億行資料。tablestore的測試環境規模以及datax job主控端的規格都較小,是以整個實驗最終達到的速率是比較小的,主要為了示範速率如何提升。而在真實的tablestore生産環境上,規模足夠的情況下,我們幫助過應用優化到每秒上百m甚至上g的速度,優化手段相同。
首先在maxcompute内建立如下表:
其次在表内倒入1億行資料,每行資料約200個位元組,其中userid列采用随機值,計算出的md5值取4個位元組作為md5列,資料樣例如下:
md5
userid
name
comments
attr0
attr1
attr2
attr3
create_time
update_time
028f
108217721721
john
0123456789....
0123456789...
20170201
20170206
01d2
192871726121
bill
f01d
284671281623
jura
測試資料導入使用的是maxcompute tunnel,速度還是比較可觀的。
資料準備完畢後,在tablestore上建立一張表,使用md5和userid作為主鍵列:
表和資料均準備完畢後,使用如下datax job配置類進行一次資料導出:
啟動datax任務,從标準輸出中可以看到目前資料導出的速度:
可以看到,目前的速度大約是1mb/s,接下來會示範如何進行優化,一步一步将速度給提升上去。
第一步是對datax的幾個基礎參數進行調優,先大緻了解下一個datax job内部,任務的運作結構:

一個datax job會切分成多個task,每個task會按taskgroup進行分組,一個task内部會有一組reader->channel->writer。channel是連接配接reader和writer的資料交換通道,所有的資料都會經由channel進行傳輸。
在datax内部對每個channel會有嚴格的速度控制,預設的速度限制是1mb/s,這也是為何我們使用預設配置,速度為1mb/s的原因。是以第一個需要優化的基礎參數就是單個channel的速度限制,更改配置如下:
我們把單個channel的速度上限配置為5mb。這個值需要針對不同的場景進行不同的配置,例如對于maxcompute,單個channel的速度可以達到幾十mb,對于tablestore,在列較小較多的場景下,單個channel的速度是幾mb,而在列較大的場景下,可能速度就會上到幾十mb。
我們目前預設配置中配置啟動的job内channel數為1,要提高速度,并發必須提高,這個是第二步要做的優化。但是在做第二個優化之前,還需要調整一個基礎參數,那就是datax job啟動的jvm的記憶體大小配置。
目前datax啟動的jvm預設的配置是"-xms1g -xmx1g",當一個job内channel數變多後,記憶體的占用會顯著增加,因為datax作為資料交換通道,在記憶體中會緩存較多的資料,例如channel中會有一個buffer,作為臨時的資料交換的緩沖區,而在部分reader和writer的中,也會存在一些buffer。
調整jvm參數的方式有兩種,一種是直接更改datax.py,另一種是在啟動的時候,加上對應的參數,如下:
通常我們建議将記憶體設定為4g或者8g,這個也可以根據實際情況來調整。
在優化完單channel的限速和jvm的記憶體參數之後,我們重新跑一下任務:
目前資料導出的速度已經從1mb提升到2mb。
在上一點中指出,目前job内部,隻有單個channel在執行導出任務,而要提升速率,要做的就是提升channel的并發數。
datax内部對每個channel會做限速,可以限制每秒byte數,也可以限制每秒record數。除了對每個channel限速,在全局還會有一個速度限制的配置,預設是不限。
提升channel并發數有三種途徑:
1, 配置全局byte限速以及單channel byte限速,channel個數 = 全局byte限速 / 單channel byte限速。(下面示例中最終channel個數為10)
2,配置全局record限速以及單channel record限速,channel個數 = 全局record限速 / 單channel record限速。(下面示例中最終channel個數為3)
3, 全局不限速,直接配置channel個數。(下面示例中最終channel個數為5)
第三種方式最簡單直接,但是這樣就缺少了全局的限速。在選擇channel個數時,同樣需要注意,channel個數并不是越多越好。channel個數的增加,帶來的是更多的cpu消耗以及記憶體消耗。如果channel并發配置過高導緻jvm記憶體不夠用,會出現的情況是發生頻繁的full gc,導出速度會驟降,适得其反。
可以在datax的輸出日志中,找到本次任務的channel的數:
在我們這次實驗中,我們把channel數直接配置為10,再進行一次導出:
可以看到在channel數從1提升到10之後,速度從2mb/s提升到了9mb/s。此時若再提高channel數到15,速度已經不見漲,而從服務端監控看,每批次導入的寫入延遲确在漲,說明目前瓶頸在tablestore寫入端。
在上面幾個優化做完後,datax資料交換這一環節已經不是瓶頸,目前瓶頸在tablestore端的寫入能力上。tablestore是分布式的存儲,一張大表會被切分成很多的分區,分區會分散到後端的各個實體機上提供服務。一張新建立的表,預設分區數為1,當這張表越來越大,tablestore會将其分裂,此時分裂是自動完成的。分區的個數,一定程度上與能提供的服務能力相關。某些業務場景,建立表後,就需要對表進行大規模的資料導入,此時預設的單個分區肯定是不夠用的,當然可以等資料量慢慢漲上來後等表自動分裂,但是這個周期會比較長。此時,我們推薦的做法是在建立表的時候進行預分區。
不過目前我們還沒有對外開放通過sdk來進行預分區的功能,是以如果需要對表進行預分區,可以先通過工單來聯系我們幫助進行預分區。
我們建立一張表,并将表預分4個分區,partition key為md5列,采用md5列的主要原因是在其上資料的分區基本是均勻的。如果資料在partition key分布不均勻,則即使做了預分區,導入性能也不會得到明顯的提升。以相同的job配置,再跑一下導出任務:
此時速度從9mb/s提升到18mb/s左右,在tablestore服務端能夠提高更多的服務能力後,我們嘗試再将channel的并發從10提高到15:
此時速度又進一步提升,從18mb/s提升到22mb/s左右。
我們建構的場景,每行大約是200位元組左右大小。datax的otswriter寫入插件底層是使用的tablestore sdk提供的batchwrite接口進行資料寫入,預設一次請求寫入100行資料,也就是說一次請求隻會導入約20kb大小的資料。每次寫過來的資料包都比較小,非常的不經濟。
目前tablestore的batchwrite的限制比較不靈活,會限制行數和資料大小,其中行數預設上限是200行。在每行都比較小的場景下,200行一次批量寫入是非常不經濟的,在我們的這次實驗中,我們将上限改為1000行,并将datax tablestore寫入插件内部一次批量寫入的行數也改為1000行,來驗證将每次寫入的包變大後,對寫入效率的提升。任務配置更改如下(配置項為job.content.writer.parameter.batchwritecount):
再次執行任務,速度如下:
速度再次提升,從22mb/s提升到29mb/s。tablestore後續會優化對batchwrite的行數限制,對于行比較小的場景采用一個比較友好的政策。
以上優化政策都是在單個datax job的場景下進行的優化,單個datax job隻能夠運作在單台伺服器上,沒有辦法分布式的執行。d2上的托管伺服器,一般是千兆網卡,也就是說最多提供100mb/s的速度。若想要進一步的速度提升,則必須采用多個datax job分布在多台伺服器上執行才行。
datax内的odpsreader,可以通過配置一次導出整張表或者表的某個partition。我們可以利用partition,來将一張表拆分成多個job分散導出,但是要求表必須是多分區的。
在我們的實驗中,建立的maxcompute表并不是多分區的,我們重新建立一張多分區的表:
增加一列為partid,作為分區,我們通過一個sql将原表的資料導入到新表,并自動均勻的分散到partid:
以上sql會将partid的值取自md5列的第一個字元,md5是一個十六進制的值,字元的取值範圍是:0-f,這樣我們就将原表切成了一個帶16個分區的表。我們希望在每個分區内,資料都是均勻的,為了避免長尾,這也是為什麼要設計一個md5列的原因。
在将一張表拆成多個分區後,我們就可以選擇在不同的伺服器上,為每個分區啟動一個任務,配置如下(job.content.reader.parameter.partition):
由于測試叢集規模的原因,我們不示範多個job并發後的速度提升。在tablestore服務端能力不是瓶頸的情況下,通過擴充datax job的并發,速度是能線性提升的。
總結下上面的幾個優化點:
對datax的幾個基本參數進行調整,包括:channel數、單個channel的限速以及jvm的記憶體參數。
建立tablestore表的時候盡量采取預分區,在設計partition key的時候盡量保證在每個partition key上導入資料的分布均勻。
如果導入tablestore的資料行都比較小,則需要考慮提高單批次的導入行數。
若單個datax job已成瓶頸,則需要考慮将任務拆成多個datax job并行執行。
希望以上經驗對各位有用,歡迎交流。