天天看點

Flume-ng HDFS sink原了解析

HDFS sink主要處理過程在process方法:

//循環batchSize次或者Channel為空

for(txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {

//該方法會調用BasicTransactionSemantics的具體實作

Event event = channel.take();

if (event == null) {

break;

}

......

//sfWriter是一個LRU緩存,緩存對檔案Handler,最大打開檔案由參數maxopenfiles控制

BucketWriter bucketWriter = sfWriters.get(lookupPath);

// 如果不存在,則構造一個緩存

if (bucketWriter == null) {

//通過HDFSWriterFactory根據filetype生成一個hdfswriter,由參數hdfs.Filetype控制;eg:HDFSDataStream

HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);

//idleCallback會在bucketWriter flush完畢後從LRU中删除;

bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,

batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,

suffix, codeC, compType,hdfsWriter, timedRollerPool,

proxyTicket, sinkCounter, idleTimeout, idleCallback,

lookupPath, callTimeout, callTimeoutPool);

sfWriters.put(lookupPath, bucketWriter);

// track一個事務内的bucket

if (!writers.contains(bucketWriter)) {

writers.add(bucketWriter);

// 寫資料到HDFS;

bucketWriter.append(event);->

open();//如果底層支援append,則通過open接口打開;否則create接口

//判斷是否進行日志切換

//根據複制的副本書和目标副本數做對比,如果不滿足則doRotate=false

if(doRotate) {

close();

open();

HDFSWriter.append(event);

if(batchCounter == batchSize) {//如果達到batchSize行進行一次flush

flush();->

doFlush()->

HDFSWriter.sync()->

FSDataoutputStream.flush/sync

// 送出事務之前,重新整理所有的bucket

for(BucketWriter bucketWriter : writers){

bucketWriter.flush();

transaction.commit();

這裡,無論是BucketWriter執行append,sync還是rename等操作都是送出到一個背景線程池進行異步處理:callWithTimeout,這個線程池的大小是由hdfs.threadsize來設定;

本文轉自MIKE老畢 51CTO部落格,原文連結:http://blog.51cto.com/boylook/1298627,如需轉載請自行聯系原作者