天天看點

MaxCompute Tunnel SDK資料上傳利器——BufferedWriter使用指南

maxcompute 的資料上傳接口(tunnel)定義了資料 block 的概念:一個 block 對應一個 http request,多個 block 的上傳可以并發而且是原子的,一次同步請求要麼成功要麼失敗,不會污染其他的 block。這種設計對于服務端來講十分簡潔,但是也把記錄狀态做 failover 的工作交給了用戶端。

使用者在使用 tunnel sdk 程式設計時,需要對 block 這一層的語義進行認知,并且驅動資料上傳的整個過程[1],并且自己進行容錯,畢竟『網絡錯誤是正常而不是異常』。由于使用者文檔中并沒有強調這一點的重要性,導緻很多使用者踩了坑,一種常見的出錯場景是,當用戶端寫資料的速度過慢,兩次 write 的間隔逾時[2],導緻整個 block 上傳失敗。

maxcompute java sdk 在 0.21.3-public  之後新增了 bufferredwriter 這個更高層的 api,簡化了資料上傳的過程,并且提供了容錯的功能。 bufferedwriter 對使用者隐藏了 block 這個概念,從使用者角度看,就是在 session 上打開一個 writer 然後往裡面寫記錄即可:

具體實作時 bufferedwriter 先将記錄緩存在用戶端的緩沖區中,并在緩沖區填滿之後打開一個 http 連接配接進行上傳。bufferedwriter 會盡最大可能容錯,保證資料上傳上去。

由于屏蔽了底層細節,這個接口可能并不适合資料預劃分、斷點續傳、分批次上傳等需要細粒度控制的場景。

多線程上傳時,每個線程隻需要打開一個 writer 往裡面寫資料就行了。

由于底層在上傳出錯時會回避一段固定的時間并進行重試,但如果你的程式不想花太多時間在重試上,或者你的程式位于一個極其惡劣的網絡環境中,為此 tunnelbufferedwriter 允許使用者配置重試政策。

使用者可以選擇三種重試回避政策:指數回避(exponential_backoff)、線性時間回避(linear_backoff)、常數時間回避(constant_backoff)。

例如下面這段代碼可以将,write 的重試次數調整為 6,每一次重試之前先分别回避 4s、8s、16s、32s、64s 和 128s(從 4 開始的指數遞增的序列)。

如果你的程式對 jvm 的記憶體有嚴格的要求,可以通過下面這個接口修改緩沖區占記憶體的位元組數(bytes):

預設配置每一個 writer 的 buffersize 是 10 mib。tunnelbufferedwriter 一次 flush buffer 的操作上傳一個 block 的資料[3]。

由于一個 session 的上傳狀态是通過維護一個 block list 實作的,對于多線程程式來講,通過鎖很容易實作資源的配置設定。但對于兩個程序空間裡的程式想要複用一個 session 時,必須通過一種機制對資源進行隔離。

具體地,在 getuploadsession 的時候,必須指定這個共享這個 session 的程序數目,以及一個用來區分程序的 global id:

[1] 一次完整的上傳流程通常包括以下步驟:

先對資料進行劃分

為每個資料塊指定 block id,即調用 openrecordwriter(id)

然後用一個或多個線程分别将這些 block 上傳上去

并在某個 block 上傳失敗以後,需要對整個 block 進行重傳

在所有 block 都上傳以後,向服務端提供上傳成功的 blockid list 進行校驗,即調用 session.commit([1,2,3,...])

[2] 因為使用長連接配接,服務端有計時器判斷是否用戶端是否 alive

[3] block 在服務端有 20000 個的數量上限,如果 buffersize 設得太小會導緻 20000 個 block 很快被用光

[4] session的有效期為24小時,超過24小時會導緻資料上傳失敗