天天看點

flume kafka sparkstreaming整合後spark executor dead 及叢集報錯java.io.IOException: Connection reset by peer

簡介             整個項目架構是在CDH中,flume采集資料到kafka,然後sparkstreaming消費(flume1.7版本,kafka0.10版本,spark 2.1版本)。然後在叢集中執行的時候,發現日志中會有下面這個錯誤,但是程式并沒有死掉,結果也和本地環境測試的結果一樣。同時發現,3個spark executor中,有一個會死掉。   

java.io.IOException: Connection reset by peer         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)         at sun.nio.ch.IOUtil.read(IOUtil.java:192)         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)         at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)         at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)         at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)         at java.lang.Thread.run(Thread.java:745)

原因分析及解決           分析上面的錯誤日志,大概資訊是連接配接被對等方關閉。同時聯想到被消費的kafka topic 一共有隻有2個partition,而3個spark executor中active的剛好是2個,死了一個。初步猜測和分區數有關系,有一個executor分不到資料,是以抛出錯誤,并且被關閉。于是進行測試,建立一個topic,給了3個partition,再次啟動應用消費資料,這次便沒有報錯資訊,同時3個executor都是active了。

繼續閱讀