上一篇我們談論了Flink stream
source,它作為流的資料入口是整個DAG(有向無環圖)拓撲的起點。那麼與此對應的,流的資料出口就是跟source對應的Sink。這是我們本篇解讀的内容。
跟<code>SourceFunction</code>對應,Flink針對Sink的根接口被稱為<code>SinkFunction</code>。繼承自<code>Function</code>這一标記接口。<code>SinkFunction</code>接口隻提供了一個方法:
該方法提供基于記錄級别的調用(也就是每個被輸出的記錄都會調用該接口一次)。上面方法的參數<code>value</code>即為需要輸出的記錄。
<code>SinkFunction</code>相對來說比較簡潔,下面我們來看一下它的實作者。
同樣,我們先來看一下完整的類型繼承體系:
這是最簡單的SinkFunction的實作,它的實作等同于沒有實作(其實作為空方法)。它的作用就是将記錄丢棄掉。它的主要場景應該是那些無需最終處理結果的記錄。
<code>WriteSinkFunction</code>是一個抽象類。該類的主要作用是将需要輸出的<code>tuples</code>(元組)作為簡單的文本輸出到指定路徑的檔案中去,元組被收集到一個list中去,然後周期性得寫入檔案。
<code>WriteSinkFunction</code>的構造器接收兩個參數:
path : 需要寫入的檔案路徑
format : <code>WriteFormat</code>的執行個體,用于指定寫入資料的格式
在構造器中,它調用方法<code>cleanFile</code>,該方法用于初始化指定path的檔案。初始化的行為是:如果不存在則建立,如果存在則清空。
invoke方法的實作:
從實作來看,其先将需要sink的元組加入内部集合。然後調用<code>updateCondition</code>方法。該方法是<code>WriteSinkFunction</code>定義的抽象方法。用于實作判斷将tupleList寫入檔案以及清空tupleList的條件。接着将集合中的tuple寫入到指定的檔案中。最後又調用了<code>resetParameters</code>方法。該方法同樣是一個抽象方法,它的主要用途是當寫入的場景是批量寫入時,可能會有一些狀态參數,該方法就是用于對狀态進行reset。
該類是<code>WriteSinkFunction</code>的實作類。它支援以指定的毫秒數間隔将tuple批量寫入檔案。間隔由構造器參數<code>millis</code>指定。在内部,<code>WriteSinkFunction</code>以<code>lastTime</code>維護上一次寫入的時間狀态。它主要涉及上面提到的兩個抽象方法的實作:
<code>updateCondition</code>的實作很簡單,拿目前主機的目前時間戳跟上一次的執行時間戳狀态作對比:如果大于指定的間隔,則條件為真,觸發寫入。
<code>resetParameters</code>實作是先清空tupleList,然後将lastTime老的時間戳狀态覆寫為最新時間戳。
一個寫入格式的抽象類,提供了兩種實作:
WriteFormatAsText : 以原樣文本的形式寫入指定路徑的檔案
WriteFormatAsCsv : 以csv格式寫入指定檔案
<code>RichSinkFunction</code>通過繼承<code>AbstractRichFunction</code>為實作一個rich
SinkFunction提供基礎(<code>AbstractRichFunction</code>提供了一個open/close方法對,以及擷取運作時上下文對象手段)。<code>RichSinkFunction</code>也是抽象類,它有三個具體實作。
支援以socket的方式将資料發送到特定目标主機所在的伺服器作為flink stream的sink。資料被序列化為byte
array然後寫入到socket。該sink支援失敗重試模式的消息發送。該sink
可啟用<code>autoFlush</code>,如果啟用,那麼會導緻吞吐量顯著下降,但延遲也會降低。該方法的構造器,提供的參數:
hostName : 待連接配接的server的host name
port : server的端口
schema :<code>SerializationSchema</code>的執行個體,用于序列化對象。
maxNumRetries : 最大重試次數(-1為無限重試)
autoflush : 是否自動flush
重試的政策在<code>invoke</code>方法中,當發送失敗時進入到異常捕捉塊中進行。
一個将記錄寫入<code>OutputFormat</code>的SinkFunction的實作。
OutputFormat :定義被消費記錄的輸出接口。指定了最終的記錄如何被存儲,比如檔案就是一種存儲實作。
該實作用于将每條記錄輸出到标準輸出流(stdOut)或标準錯誤流(stdErr)。在輸出時,如果目前task的并行subtask執行個體個數大于1,也就是說目前task是并行執行的(同時存在多個執行個體),那麼在輸出每條記錄之前會輸出一個<code>prefix</code>字首。prefix為在全局上下文中目前subtask的位置。
Flink自身提供了一些針對第三方主流開源系統的連接配接器支援,它們有:
elasticsearch
flume
kafka(0.8/0.9版本)
nifi
rabbitmq
這些第三方系統(除了twitter)的sink,無一例外都是繼承自<code>RichSinkFunction</code>。
這篇文章我們主要談及了Flink的stream sink相關的設計、實作。當然這個主題還沒有完全談完,還會有後續篇幅繼續解讀。
原文釋出時間為:2016-05-07
本文作者:vinoYang