天天看點

【Flume】HDFSSink源碼了解

HDFSSink元件中,主要由HDFSEventSink,BucketWriter,HDFSWriter幾個類構成。

其中HDFSEventSink主要功能呢是判定Sink的配置條件是否合法,并負責從Channel中擷取events,通過解析event的header資訊決定event對應的BucketWriter。

BucketWriter負責按照rollCount,rollSize等條件在HDFS端生成(roll)檔案,通過配置檔案配置的檔案資料格式以及序列化的方式,在每個BucetWriter同一處理。

HDFSWriter作為接口,其具體實作有HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream這三種

HDFSSink功能中關鍵類類圖

HDFSEventSink類

2、start()方法,初始化固定大小線程池callTimeoutPool, 周期執行線程池timedRollerPool,以及sfWriters,并啟動sinkCounter

callTimeoutPool

timedRollerPool,周期執行線程池中主要有HDFS檔案重命名的線程(根據retryInterval),達到生成檔案要求進行roll操作的線程(根據idleTimeout),關閉閑置檔案的線程等(rollInterval)

sfWriters  sfWriters實際是一個LinkedHashMap的實作類,通過重寫removeEldestEntry方法,将最久未使用的writer移除,保證sfWriters中能夠維護一個固定大小(maxOpenFiles)的最大打開檔案數

sinkCounter sink元件監控名額的計數器

3、process() 方法是HDFSEventSink中最主要的邏輯(部分關鍵節點通過注釋寫到代碼中),

process()方法中擷取到Channel,

并按照batchSize大小循環從Channel中擷取event,通過解析event得到event的header等資訊,确定該event的HDFS目的路徑以及目的檔案名

每個event可能對應不同的bucketWriter和hdfswriter,将每個event添加到相應的writer中

當event個數達到batchSize個數後,writer進行flush,同時送出事務

其中bucketWriter負責生成(roll)檔案的方式,處理檔案格式以及序列化等邏輯

其中hdfsWriter具體實作有"SequenceFile","DataStream","CompressedStream";三種,使用者根據hdfs.fileType參數确定具體hdfsWriter的實作

<code>public</code> <code>Status process() </code><code>throws</code> <code>EventDeliveryException {</code>

<code>Channel channel = getChannel(); </code><code>//調用父類getChannel方法,建立Channel與Sink之間的連接配接</code>

<code>Transaction transaction = channel.getTransaction();</code><code>//每次batch送出都建立在一個事務上</code>

<code>transaction.begin();</code>

<code>try</code> <code>{</code>

<code>Set&lt;BucketWriter&gt; writers = </code><code>new</code> <code>LinkedHashSet&lt;&gt;();</code>

<code>int</code> <code>txnEventCount = </code><code>0</code><code>;</code>

<code>for</code> <code>(txnEventCount = </code><code>0</code><code>; txnEventCount &lt; batchSize; txnEventCount++) {</code>

<code>Event event = channel.take();</code><code>//從Channel中取出event</code>

<code>if</code> <code>(event == </code><code>null</code><code>) {</code><code>//沒有新的event的時候,則不需要按照batchSize循環取</code>

<code>break</code><code>;</code>

<code>}</code>

<code>// reconstruct the path name by substituting place holders</code>

<code>// 在配置檔案中會有“a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S”這樣的%表示的變量</code>

<code>// 解析配置檔案中的變量構造realPath 和 realName</code>

<code>String realPath = BucketPath.escapeString(filePath, event.getHeaders(),</code>

<code>timeZone, needRounding, roundUnit, roundValue, useLocalTime);</code>

<code>String realName = BucketPath.escapeString(fileName, event.getHeaders(),</code>

<code>String lookupPath = realPath + DIRECTORY_DELIMITER + realName;</code>

<code>BucketWriter bucketWriter;</code>

<code>HDFSWriter hdfsWriter = </code><code>null</code><code>;</code>

<code>WriterCallback closeCallback = </code><code>new</code> <code>WriterCallback() {</code>

<code>@Override</code>

<code>public</code> <code>void</code> <code>run(String bucketPath) {</code>

<code>LOG.info(</code><code>"Writer callback called."</code><code>);</code>

<code>synchronized</code> <code>(sfWritersLock) {</code>

<code>sfWriters.remove(bucketPath);</code><code>//sfWriters以LRU方式維護了一個maxOpenFiles大小的map.始終保持最多打開檔案個數</code>

<code>};</code>

<code>bucketWriter = sfWriters.get(lookupPath);</code>

<code>// we haven't seen this file yet, so open it and cache the handle</code>

<code>if</code> <code>(bucketWriter == </code><code>null</code><code>) {</code>

<code>hdfsWriter = writerFactory.getWriter(fileType);</code><code>//通過工廠擷取檔案類型,其中包括"SequenceFile","DataStream","CompressedStream";</code>

<code>bucketWriter = initializeBucketWriter(realPath, realName,</code>

<code>lookupPath, hdfsWriter, closeCallback);</code>

<code>sfWriters.put(lookupPath, bucketWriter);</code>

<code>// Write the data to HDFS</code>

<code>bucketWriter.append(event);</code>

<code>} </code><code>catch</code> <code>(BucketClosedException ex) {</code>

<code>LOG.info(</code><code>"Bucket was closed while trying to append, "</code> <code>+</code>

<code>"reinitializing bucket and writing event."</code><code>);</code>

<code>hdfsWriter = writerFactory.getWriter(fileType);</code>

<code>// track the buckets getting written in this transaction</code>

<code>if</code> <code>(!writers.contains(bucketWriter)) {</code>

<code>writers.add(bucketWriter);</code>

<code>if</code> <code>(txnEventCount == </code><code>0</code><code>) {</code>

<code>sinkCounter.incrementBatchEmptyCount();</code>

<code>} </code><code>else</code> <code>if</code> <code>(txnEventCount == batchSize) {</code>

<code>sinkCounter.incrementBatchCompleteCount();</code>

<code>} </code><code>else</code> <code>{</code>

<code>sinkCounter.incrementBatchUnderflowCount();</code>

<code>// flush all pending buckets before committing the transaction</code>

<code>for</code> <code>(BucketWriter bucketWriter : writers) {</code>

<code>bucketWriter.flush();</code>

<code>transaction.commit();</code>

<code>if</code> <code>(txnEventCount &lt; </code><code>1</code><code>) {</code>

<code>return</code> <code>Status.BACKOFF;</code>

<code>sinkCounter.addToEventDrainSuccessCount(txnEventCount);</code>

<code>return</code> <code>Status.READY;</code>

<code>} </code><code>catch</code> <code>(IOException eIO) {</code>

<code>transaction.rollback();</code>

<code>LOG.warn(</code><code>"HDFS IO error"</code><code>, eIO);</code>

<code>} </code><code>catch</code> <code>(Throwable th) {</code>

<code>LOG.error(</code><code>"process failed"</code><code>, th);</code>

<code>if</code> <code>(th </code><code>instanceof</code> <code>Error) {</code>

<code>throw</code> <code>(Error) th;</code>

<code>throw</code> <code>new</code> <code>EventDeliveryException(th);</code>

<code>} </code><code>finally</code> <code>{</code>

<code>transaction.close();</code>

BucketWriter

flush() 方法:

BucketWriter中維護了一個batchCounter,在這個batchCounter大小不為0的時候會進行doFlush(), doFlush()主要就是對batch中的event進行序列化和輸出流flush操作,最終結果就是将events寫入HDFS中。

如果使用者設定了idleTimeout參數不為0,在doFlush()操作之後,會往定時執行線程池中添加一個任務,該關閉目前連接配接HDFS的輸出對象HDFSWriter,執行時間間隔為idleTimeout,并将這個延遲排程的任務指派給idleFuture變量。

append()方法:

在介紹flush()方法中,會介紹一個idleFuture變量對應的功能,在append()方法執行前首先會檢查idleFuture任務是否執行完畢,如果沒有執行完成會設定一個逾時時間callTimeout等待該程序完成,然後再進行append之後的操作。這樣做主要是為了防止關閉HdfsWriter的過程中還在往HDFS中append資料,在append一半時候,HdfsWriter關閉了。

之後,在正是append()之前,又要首先檢查目前是否存在HDFSWirter可用于append操作,如果沒有調用open()方法。

每次将event往hdfs中append的時候都需要對rollCount,rollSize兩個參數進行檢查,在滿足這兩個參數條件的情況下,就需要将臨時檔案重命名為(roll)正式的HDFS檔案。之後,重新再open一個hdfswriter,往這個hdfswriter中append每個event,當event個數達到batchSize時,進行flush操作。

<code>public</code> <code>synchronized</code> <code>void</code> <code>append(</code><code>final</code> <code>Event event) </code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>checkAndThrowInterruptedException();</code>

<code>// idleFuture是ScheduledFuture執行個體,主要功能關閉目前HDFSWriter,在append event之前需要判斷</code>

<code>// idleFuture是否已經執行完成,否則會造成在append一半的時候 hdfswriter被關閉</code>

<code>if</code> <code>(idleFuture != </code><code>null</code><code>) {</code>

<code>idleFuture.cancel(</code><code>false</code><code>);</code>

<code>// There is still a small race condition - if the idleFuture is already</code>

<code>// running, interrupting it can cause HDFS close operation to throw -</code>

<code>// so we cannot interrupt it while running. If the future could not be</code>

<code>// cancelled, it is already running - wait for it to finish before</code>

<code>// attempting to write.</code>

<code>if</code> <code>(!idleFuture.isDone()) {</code>

<code>idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);</code>

<code>} </code><code>catch</code> <code>(TimeoutException ex) {</code>

<code>LOG.warn(</code><code>"Timeout while trying to cancel closing of idle file. Idle"</code> <code>+</code>

<code>" file close may have failed"</code><code>, ex);</code>

<code>} </code><code>catch</code> <code>(Exception ex) {</code>

<code>LOG.warn(</code><code>"Error while trying to cancel closing of idle file. "</code><code>, ex);</code>

<code>idleFuture = </code><code>null</code><code>;</code>

<code>// If the bucket writer was closed due to roll timeout or idle timeout,</code>

<code>// force a new bucket writer to be created. Roll count and roll size will</code>

<code>// just reuse this one</code>

<code>if</code> <code>(!isOpen) {</code>

<code>if</code> <code>(closed) {</code>

<code>throw</code> <code>new</code> <code>BucketClosedException(</code><code>"This bucket writer was closed and "</code> <code>+</code>

<code>"this handle is thus no longer valid"</code><code>);</code>

<code>open();</code>

<code>// 檢查rollCount,rollSize兩個roll檔案的參數,判斷是否roll出新檔案</code>

<code>if</code> <code>(shouldRotate()) {</code>

<code>boolean</code> <code>doRotate = </code><code>true</code><code>;</code>

<code>if</code> <code>(isUnderReplicated) {</code>

<code>if</code> <code>(maxConsecUnderReplRotations &gt; </code><code>0</code> <code>&amp;&amp;</code>

<code>consecutiveUnderReplRotateCount &gt;= maxConsecUnderReplRotations) {</code>

<code>doRotate = </code><code>false</code><code>;</code>

<code>if</code> <code>(consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {</code>

<code>LOG.error(</code><code>"Hit max consecutive under-replication rotations ({}); "</code> <code>+</code>

<code>"will not continue rolling files under this path due to "</code> <code>+</code>

<code>"under-replication"</code><code>, maxConsecUnderReplRotations);</code>

<code>LOG.warn(</code><code>"Block Under-replication detected. Rotating file."</code><code>);</code>

<code>consecutiveUnderReplRotateCount++;</code>

<code>consecutiveUnderReplRotateCount = </code><code>0</code><code>;</code>

<code>if</code> <code>(doRotate) {</code>

<code>close();</code>

<code>// write the event</code>

<code>sinkCounter.incrementEventDrainAttemptCount();</code><code>// sinkCounter統計metrix</code>

<code>callWithTimeout(</code><code>new</code> <code>CallRunner&lt;Void&gt;() {</code>

<code>public</code> <code>Void call() </code><code>throws</code> <code>Exception {</code>

<code>writer.append(event); </code><code>//writer是通過配置參數hdfs.fileType建立的HDFSWriter實作</code>

<code>return</code> <code>null</code><code>;</code>

<code>});</code>

<code>} </code><code>catch</code> <code>(IOException e) {</code>

<code>LOG.warn(</code><code>"Caught IOException writing to HDFSWriter ({}). Closing file ("</code> <code>+</code>

<code>bucketPath + </code><code>") and rethrowing exception."</code><code>,</code>

<code>e.getMessage());</code>

<code>close(</code><code>true</code><code>);</code>

<code>} </code><code>catch</code> <code>(IOException e2) {</code>

<code>LOG.warn(</code><code>"Caught IOException while closing file ("</code> <code>+</code>

<code>bucketPath + </code><code>"). Exception follows."</code><code>, e2);</code>

<code>throw</code> <code>e;</code>

<code>// update statistics</code>

<code>processSize += event.getBody().length;</code>

<code>eventCounter++;</code>

<code>batchCounter++;</code>

<code>if</code> <code>(batchCounter == batchSize) {</code>

<code>flush();</code>

     本文轉自巧克力黒 51CTO部落格,原文連結:http://blog.51cto.com/10120275/2052970,如需轉載請自行聯系原作者

繼續閱讀