HDFSSink组件中,主要由HDFSEventSink,BucketWriter,HDFSWriter几个类构成。
其中HDFSEventSink主要功能呢是判定Sink的配置条件是否合法,并负责从Channel中获取events,通过解析event的header信息决定event对应的BucketWriter。
BucketWriter负责按照rollCount,rollSize等条件在HDFS端生成(roll)文件,通过配置文件配置的文件数据格式以及序列化的方式,在每个BucetWriter同一处理。
HDFSWriter作为接口,其具体实现有HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream这三种
HDFSSink功能中关键类类图
HDFSEventSink类
2、start()方法,初始化固定大小线程池callTimeoutPool, 周期执行线程池timedRollerPool,以及sfWriters,并启动sinkCounter
callTimeoutPool
timedRollerPool,周期执行线程池中主要有HDFS文件重命名的线程(根据retryInterval),达到生成文件要求进行roll操作的线程(根据idleTimeout),关闭闲置文件的线程等(rollInterval)
sfWriters sfWriters实际是一个LinkedHashMap的实现类,通过重写removeEldestEntry方法,将最久未使用的writer移除,保证sfWriters中能够维护一个固定大小(maxOpenFiles)的最大打开文件数
sinkCounter sink组件监控指标的计数器
3、process() 方法是HDFSEventSink中最主要的逻辑(部分关键节点通过注释写到代码中),
process()方法中获取到Channel,
并按照batchSize大小循环从Channel中获取event,通过解析event得到event的header等信息,确定该event的HDFS目的路径以及目的文件名
每个event可能对应不同的bucketWriter和hdfswriter,将每个event添加到相应的writer中
当event个数达到batchSize个数后,writer进行flush,同时提交事务
其中bucketWriter负责生成(roll)文件的方式,处理文件格式以及序列化等逻辑
其中hdfsWriter具体实现有"SequenceFile","DataStream","CompressedStream";三种,用户根据hdfs.fileType参数确定具体hdfsWriter的实现
<code>public</code> <code>Status process() </code><code>throws</code> <code>EventDeliveryException {</code>
<code>Channel channel = getChannel(); </code><code>//调用父类getChannel方法,建立Channel与Sink之间的连接</code>
<code>Transaction transaction = channel.getTransaction();</code><code>//每次batch提交都建立在一个事务上</code>
<code>transaction.begin();</code>
<code>try</code> <code>{</code>
<code>Set<BucketWriter> writers = </code><code>new</code> <code>LinkedHashSet<>();</code>
<code>int</code> <code>txnEventCount = </code><code>0</code><code>;</code>
<code>for</code> <code>(txnEventCount = </code><code>0</code><code>; txnEventCount < batchSize; txnEventCount++) {</code>
<code>Event event = channel.take();</code><code>//从Channel中取出event</code>
<code>if</code> <code>(event == </code><code>null</code><code>) {</code><code>//没有新的event的时候,则不需要按照batchSize循环取</code>
<code>break</code><code>;</code>
<code>}</code>
<code>// reconstruct the path name by substituting place holders</code>
<code>// 在配置文件中会有“a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S”这样的%表示的变量</code>
<code>// 解析配置文件中的变量构造realPath 和 realName</code>
<code>String realPath = BucketPath.escapeString(filePath, event.getHeaders(),</code>
<code>timeZone, needRounding, roundUnit, roundValue, useLocalTime);</code>
<code>String realName = BucketPath.escapeString(fileName, event.getHeaders(),</code>
<code>String lookupPath = realPath + DIRECTORY_DELIMITER + realName;</code>
<code>BucketWriter bucketWriter;</code>
<code>HDFSWriter hdfsWriter = </code><code>null</code><code>;</code>
<code>WriterCallback closeCallback = </code><code>new</code> <code>WriterCallback() {</code>
<code>@Override</code>
<code>public</code> <code>void</code> <code>run(String bucketPath) {</code>
<code>LOG.info(</code><code>"Writer callback called."</code><code>);</code>
<code>synchronized</code> <code>(sfWritersLock) {</code>
<code>sfWriters.remove(bucketPath);</code><code>//sfWriters以LRU方式维护了一个maxOpenFiles大小的map.始终保持最多打开文件个数</code>
<code>};</code>
<code>bucketWriter = sfWriters.get(lookupPath);</code>
<code>// we haven't seen this file yet, so open it and cache the handle</code>
<code>if</code> <code>(bucketWriter == </code><code>null</code><code>) {</code>
<code>hdfsWriter = writerFactory.getWriter(fileType);</code><code>//通过工厂获取文件类型,其中包括"SequenceFile","DataStream","CompressedStream";</code>
<code>bucketWriter = initializeBucketWriter(realPath, realName,</code>
<code>lookupPath, hdfsWriter, closeCallback);</code>
<code>sfWriters.put(lookupPath, bucketWriter);</code>
<code>// Write the data to HDFS</code>
<code>bucketWriter.append(event);</code>
<code>} </code><code>catch</code> <code>(BucketClosedException ex) {</code>
<code>LOG.info(</code><code>"Bucket was closed while trying to append, "</code> <code>+</code>
<code>"reinitializing bucket and writing event."</code><code>);</code>
<code>hdfsWriter = writerFactory.getWriter(fileType);</code>
<code>// track the buckets getting written in this transaction</code>
<code>if</code> <code>(!writers.contains(bucketWriter)) {</code>
<code>writers.add(bucketWriter);</code>
<code>if</code> <code>(txnEventCount == </code><code>0</code><code>) {</code>
<code>sinkCounter.incrementBatchEmptyCount();</code>
<code>} </code><code>else</code> <code>if</code> <code>(txnEventCount == batchSize) {</code>
<code>sinkCounter.incrementBatchCompleteCount();</code>
<code>} </code><code>else</code> <code>{</code>
<code>sinkCounter.incrementBatchUnderflowCount();</code>
<code>// flush all pending buckets before committing the transaction</code>
<code>for</code> <code>(BucketWriter bucketWriter : writers) {</code>
<code>bucketWriter.flush();</code>
<code>transaction.commit();</code>
<code>if</code> <code>(txnEventCount < </code><code>1</code><code>) {</code>
<code>return</code> <code>Status.BACKOFF;</code>
<code>sinkCounter.addToEventDrainSuccessCount(txnEventCount);</code>
<code>return</code> <code>Status.READY;</code>
<code>} </code><code>catch</code> <code>(IOException eIO) {</code>
<code>transaction.rollback();</code>
<code>LOG.warn(</code><code>"HDFS IO error"</code><code>, eIO);</code>
<code>} </code><code>catch</code> <code>(Throwable th) {</code>
<code>LOG.error(</code><code>"process failed"</code><code>, th);</code>
<code>if</code> <code>(th </code><code>instanceof</code> <code>Error) {</code>
<code>throw</code> <code>(Error) th;</code>
<code>throw</code> <code>new</code> <code>EventDeliveryException(th);</code>
<code>} </code><code>finally</code> <code>{</code>
<code>transaction.close();</code>
BucketWriter
flush() 方法:
BucketWriter中维护了一个batchCounter,在这个batchCounter大小不为0的时候会进行doFlush(), doFlush()主要就是对batch中的event进行序列化和输出流flush操作,最终结果就是将events写入HDFS中。
如果用户设置了idleTimeout参数不为0,在doFlush()操作之后,会往定时执行线程池中添加一个任务,该关闭当前连接HDFS的输出对象HDFSWriter,执行时间间隔为idleTimeout,并将这个延迟调度的任务赋值给idleFuture变量。
append()方法:
在介绍flush()方法中,会介绍一个idleFuture变量对应的功能,在append()方法执行前首先会检查idleFuture任务是否执行完毕,如果没有执行完成会设置一个超时时间callTimeout等待该进程完成,然后再进行append之后的操作。这样做主要是为了防止关闭HdfsWriter的过程中还在往HDFS中append数据,在append一半时候,HdfsWriter关闭了。
之后,在正是append()之前,又要首先检查当前是否存在HDFSWirter可用于append操作,如果没有调用open()方法。
每次将event往hdfs中append的时候都需要对rollCount,rollSize两个参数进行检查,在满足这两个参数条件的情况下,就需要将临时文件重命名为(roll)正式的HDFS文件。之后,重新再open一个hdfswriter,往这个hdfswriter中append每个event,当event个数达到batchSize时,进行flush操作。
<code>public</code> <code>synchronized</code> <code>void</code> <code>append(</code><code>final</code> <code>Event event) </code><code>throws</code> <code>IOException, InterruptedException {</code>
<code>checkAndThrowInterruptedException();</code>
<code>// idleFuture是ScheduledFuture实例,主要功能关闭当前HDFSWriter,在append event之前需要判断</code>
<code>// idleFuture是否已经执行完成,否则会造成在append一半的时候 hdfswriter被关闭</code>
<code>if</code> <code>(idleFuture != </code><code>null</code><code>) {</code>
<code>idleFuture.cancel(</code><code>false</code><code>);</code>
<code>// There is still a small race condition - if the idleFuture is already</code>
<code>// running, interrupting it can cause HDFS close operation to throw -</code>
<code>// so we cannot interrupt it while running. If the future could not be</code>
<code>// cancelled, it is already running - wait for it to finish before</code>
<code>// attempting to write.</code>
<code>if</code> <code>(!idleFuture.isDone()) {</code>
<code>idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);</code>
<code>} </code><code>catch</code> <code>(TimeoutException ex) {</code>
<code>LOG.warn(</code><code>"Timeout while trying to cancel closing of idle file. Idle"</code> <code>+</code>
<code>" file close may have failed"</code><code>, ex);</code>
<code>} </code><code>catch</code> <code>(Exception ex) {</code>
<code>LOG.warn(</code><code>"Error while trying to cancel closing of idle file. "</code><code>, ex);</code>
<code>idleFuture = </code><code>null</code><code>;</code>
<code>// If the bucket writer was closed due to roll timeout or idle timeout,</code>
<code>// force a new bucket writer to be created. Roll count and roll size will</code>
<code>// just reuse this one</code>
<code>if</code> <code>(!isOpen) {</code>
<code>if</code> <code>(closed) {</code>
<code>throw</code> <code>new</code> <code>BucketClosedException(</code><code>"This bucket writer was closed and "</code> <code>+</code>
<code>"this handle is thus no longer valid"</code><code>);</code>
<code>open();</code>
<code>// 检查rollCount,rollSize两个roll文件的参数,判断是否roll出新文件</code>
<code>if</code> <code>(shouldRotate()) {</code>
<code>boolean</code> <code>doRotate = </code><code>true</code><code>;</code>
<code>if</code> <code>(isUnderReplicated) {</code>
<code>if</code> <code>(maxConsecUnderReplRotations > </code><code>0</code> <code>&&</code>
<code>consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {</code>
<code>doRotate = </code><code>false</code><code>;</code>
<code>if</code> <code>(consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {</code>
<code>LOG.error(</code><code>"Hit max consecutive under-replication rotations ({}); "</code> <code>+</code>
<code>"will not continue rolling files under this path due to "</code> <code>+</code>
<code>"under-replication"</code><code>, maxConsecUnderReplRotations);</code>
<code>LOG.warn(</code><code>"Block Under-replication detected. Rotating file."</code><code>);</code>
<code>consecutiveUnderReplRotateCount++;</code>
<code>consecutiveUnderReplRotateCount = </code><code>0</code><code>;</code>
<code>if</code> <code>(doRotate) {</code>
<code>close();</code>
<code>// write the event</code>
<code>sinkCounter.incrementEventDrainAttemptCount();</code><code>// sinkCounter统计metrix</code>
<code>callWithTimeout(</code><code>new</code> <code>CallRunner<Void>() {</code>
<code>public</code> <code>Void call() </code><code>throws</code> <code>Exception {</code>
<code>writer.append(event); </code><code>//writer是通过配置参数hdfs.fileType创建的HDFSWriter实现</code>
<code>return</code> <code>null</code><code>;</code>
<code>});</code>
<code>} </code><code>catch</code> <code>(IOException e) {</code>
<code>LOG.warn(</code><code>"Caught IOException writing to HDFSWriter ({}). Closing file ("</code> <code>+</code>
<code>bucketPath + </code><code>") and rethrowing exception."</code><code>,</code>
<code>e.getMessage());</code>
<code>close(</code><code>true</code><code>);</code>
<code>} </code><code>catch</code> <code>(IOException e2) {</code>
<code>LOG.warn(</code><code>"Caught IOException while closing file ("</code> <code>+</code>
<code>bucketPath + </code><code>"). Exception follows."</code><code>, e2);</code>
<code>throw</code> <code>e;</code>
<code>// update statistics</code>
<code>processSize += event.getBody().length;</code>
<code>eventCounter++;</code>
<code>batchCounter++;</code>
<code>if</code> <code>(batchCounter == batchSize) {</code>
<code>flush();</code>
本文转自巧克力黒 51CTO博客,原文链接:http://blog.51cto.com/10120275/2052970,如需转载请自行联系原作者