天天看点

flume bucketpath的bug一例

今天在做flume+kerberos写入hdfs时遇到的问题。

测试的配置文件:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

<code>agent-server1.sources= testtail</code>

<code>agent-server1.sinks = hdfs-sink</code>

<code>agent-server1.channels= hdfs-channel</code>

<code>agent-server1.sources.testtail.</code><code>type</code> <code>= netcat</code>

<code>agent-server1.sources.testtail.bind = localhost</code>

<code>agent-server1.sources.testtail.port = 9999</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs</code><code>/_HOST</code><code>@KERBEROS_HADOOP</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = </code><code>/home/vipshop/conf/hdfs</code><code>.keytab</code>

<code>agent-server1.channels.hdfs-channel.</code><code>type</code> <code>= memory</code>

<code>agent-server1.channels.hdfs-channel.capacity = 200000000</code>

<code>agent-server1.channels.hdfs-channel.transactionCapacity = 10000</code>

<code>agent-server1.sinks.hdfs-sink.</code><code>type</code> <code>= hdfs</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.path = hdfs:</code><code>//bipcluster/tmp/flume/</code><code>%Y%m%d</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.round = </code><code>false</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100</code>

<code>agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip</code>

<code>agent-server1.sinks.hdfs-sink.channel = hdfs-channel</code>

<code>agent-server1.sources.testtail.channels = hdfs-channel</code>

在启动服务后,使用telnet进行测试,发现如下报错:

29

30

31

<code>14</code><code>/</code><code>03</code><code>/</code><code>24</code> <code>18</code><code>:</code><code>03</code><code>:</code><code>07</code> <code>ERROR hdfs.HDFSEventSink: process failed</code>

<code>java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.</code>

<code> </code><code>Please check that you're correctly populating timestamp header (</code><code>for</code> <code>example using TimestampInterceptor source interceptor).</code>

<code>        </code><code>at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:</code><code>160</code><code>)</code>

<code>        </code><code>at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:</code><code>343</code><code>)</code>

<code>        </code><code>at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:</code><code>392</code><code>)</code>

<code>        </code><code>at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:</code><code>68</code><code>)</code>

<code>        </code><code>at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:</code><code>147</code><code>)</code>

<code>        </code><code>at java.lang.Thread.run(Thread.java:</code><code>662</code><code>)</code>

<code>Caused by: java.lang.NumberFormatException: </code><code>null</code>

<code>        </code><code>at java.lang.Long.parseLong(Long.java:</code><code>375</code><code>)</code>

<code>        </code><code>at java.lang.Long.valueOf(Long.java:</code><code>525</code><code>)</code>

<code>        </code><code>at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:</code><code>158</code><code>)</code>

<code>        </code><code>... </code><code>5</code> <code>more</code>

<code>14</code><code>/</code><code>03</code><code>/</code><code>24</code> <code>18</code><code>:</code><code>03</code><code>:</code><code>07</code> <code>ERROR flume.SinkRunner: Unable to deliver event. Exception follows.</code>

<code>org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to</code>

<code>resolve time based bucketing. Please check that you're correctly populating timestamp header (</code><code>for</code> <code>example using TimestampInterceptor source interceptor).</code>

<code>        </code><code>at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:</code><code>461</code><code>)</code>

<code>Caused by: java.lang.RuntimeException: Flume wasn</code><code>'t able to parse timestamp header in the event to resolve time based bucketing. Please check that you'</code><code>re correctly populating timestamp header (</code><code>for</code> <code>example using TimestampInterceptor source interceptor).</code>

<code>        </code><code>... </code><code>3</code> <code>more</code>

从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。

在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。

其中replaceShorthand方法的相关代码如下:

<code>  </code><code>public</code> <code>static</code> <code>String replaceShorthand(</code><code>char</code> <code>c, Map&lt;String, String&gt; headers,</code>

<code>      </code><code>TimeZone timeZone, </code><code>boolean</code> <code>needRounding, </code><code>int</code> <code>unit, </code><code>int</code> <code>roundDown) {</code>

<code>    </code><code>String timestampHeader = headers.get(</code><code>"timestamp"</code><code>);</code>

<code>    </code><code>long</code> <code>ts;</code>

<code>    </code><code>try</code> <code>{</code>

<code>      </code><code>ts = Long.valueOf(timestampHeader);</code>

<code>    </code><code>} </code><code>catch</code> <code>(NumberFormatException e) {</code>

<code>      </code><code>throw</code> <code>new</code> <code>RuntimeException(</code><code>"Flume wasn't able to parse timestamp header"</code>

<code>        </code><code>+ </code><code>" in the event to resolve time based bucketing. Please check that"</code>

<code>        </code><code>+ </code><code>" you're correctly populating timestamp header (for example using"</code>

<code>        </code><code>+ </code><code>" TimestampInterceptor source interceptor)."</code><code>, e);</code>

<code>    </code><code>}</code>

<code>    </code><code>if</code><code>(needRounding){</code>

<code>      </code><code>ts = roundDown(roundDown, unit, ts);</code>

<code>........</code>

从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。

这其实是flume的一个bug,bug id:

<a href="https://issues.apache.org/jira/browse/FLUME-1419" target="_blank">https://issues.apache.org/jira/browse/FLUME-1419</a>

解决方法有3个:

1.更改配置,更新hdfs文件的路径格式

<code>agent-server1.sinks.hdfs-sink.hdfs.path = hdfs:</code><code>//bipcluster/tmp/flume</code>

但是这样就不能按天来存放日志了

2.通过更改相关的代码

(patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)

如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。

相关代码:

<code>     </code><code>String timestampHeader = headers.get(</code><code>"timestamp"</code><code>);</code>

<code>     </code><code>long ts;</code>

<code>     </code><code>try {</code>

<code>      </code><code>if</code> <code>(timestampHeader == null) {</code>

<code>        </code><code>ts = System.currentTimeMillis();</code>

<code>      </code><code>} </code><code>else</code> <code>{</code>

<code>        </code><code>ts = Long.valueOf(timestampHeader);</code>

<code>      </code><code>}</code>

<code>     </code><code>} catch (NumberFormatException e) {</code>

<code>       </code><code>throw new RuntimeException(</code><code>"Flume wasn't able to parse timestamp header"</code>

<code>         </code><code>+ </code><code>" in the event to resolve time based bucketing. Please check that"</code>

<code>         </code><code>+ </code><code>" you're correctly populating timestamp header (for example using"</code>

<code>                  </code><code>+ </code><code>" TimestampInterceptor source interceptor)."</code><code>, e);</code>

<code>}</code>

3.为source定义基于timestamp的interceptors 

在配置中增加两行即可:

<code>agent-server1.sources.testtail.interceptors = i1</code>

<code>agent-server1.sources.testtail.interceptors.i1.</code><code>type</code> <code>= org.apache.flume.interceptor.TimestampInterceptor$Builder</code>

一个技巧:

在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。

<code>-Dflume.root.logger=DEBUG,console,LOGFILE</code>

<code></code>

本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1384187,如需转载请自行联系原作者

继续阅读