HDFS sink主要處理過程在process方法:
//循環batchSize次或者Channel為空
for(txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//該方法會調用BasicTransactionSemantics的具體實作
Event event = channel.take();
if (event == null) {
break;
}
......
//sfWriter是一個LRU緩存,緩存對檔案Handler,最大打開檔案由參數maxopenfiles控制
BucketWriter bucketWriter = sfWriters.get(lookupPath);
// 如果不存在,則構造一個緩存
if (bucketWriter == null) {
//通過HDFSWriterFactory根據filetype生成一個hdfswriter,由參數hdfs.Filetype控制;eg:HDFSDataStream
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
//idleCallback會在bucketWriter flush完畢後從LRU中删除;
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType,hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter, idleTimeout, idleCallback,
lookupPath, callTimeout, callTimeoutPool);
sfWriters.put(lookupPath, bucketWriter);
// track一個事務内的bucket
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
// 寫資料到HDFS;
bucketWriter.append(event);->
open();//如果底層支援append,則通過open接口打開;否則create接口
//判斷是否進行日志切換
//根據複制的副本書和目标副本數做對比,如果不滿足則doRotate=false
if(doRotate) {
close();
open();
HDFSWriter.append(event);
if(batchCounter == batchSize) {//如果達到batchSize行進行一次flush
flush();->
doFlush()->
HDFSWriter.sync()->
FSDataoutputStream.flush/sync
// 送出事務之前,重新整理所有的bucket
for(BucketWriter bucketWriter : writers){
bucketWriter.flush();
transaction.commit();
這裡,無論是BucketWriter執行append,sync還是rename等操作都是送出到一個背景線程池進行異步處理:callWithTimeout,這個線程池的大小是由hdfs.threadsize來設定;
本文轉自MIKE老畢 51CTO部落格,原文連結:http://blog.51cto.com/boylook/1298627,如需轉載請自行聯系原作者