天天看点

spark job运行参数优化

      使用spark join两张表(5000w*500w)总是出错,报的异常显示是在shuffle阶段。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

<code>14</code><code>/</code><code>11</code><code>/</code><code>27</code> <code>12</code><code>:</code><code>05</code><code>:</code><code>49</code> <code>error storage.diskblockobjectwriter: uncaught exception </code><code>while</code> <code>reverting partial writes to file /hadoop/application_1415632483774_448143/spark-local-</code><code>20141127115224</code><code>-9ca8/</code><code>04</code><code>/shuffle_1_1562_27</code>

<code>java.io.filenotfoundexception: /hadoop/application_1415632483774_448143/spark-local-</code><code>20141127115224</code><code>-9ca8/</code><code>04</code><code>/shuffle_1_1562_27 (no such file or directory)</code>

<code>        </code><code>at java.io.fileoutputstream.open(native method)</code>

<code>        </code><code>at java.io.fileoutputstream.&lt;init&gt;(fileoutputstream.java:</code><code>212</code><code>)</code>

<code>        </code><code>at org.apache.spark.storage.diskblockobjectwriter.revertpartialwritesandclose(blockobjectwriter.scala:</code><code>178</code><code>)</code>

<code>        </code><code>at org.apache.spark.shuffle.hash.hashshufflewriter$$anonfun$revertwrites$</code><code>1</code><code>.apply(hashshufflewriter.scala:</code><code>118</code><code>)</code>

<code>        </code><code>at org.apache.spark.shuffle.hash.hashshufflewriter$$anonfun$revertwrites$</code><code>1</code><code>.apply(hashshufflewriter.scala:</code><code>117</code><code>)</code>

<code>        </code><code>at scala.collection.indexedseqoptimized$</code><code>class</code><code>.foreach(indexedseqoptimized.scala:</code><code>33</code><code>)</code>

<code>        </code><code>at scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:</code><code>108</code><code>)</code>

<code>        </code><code>at org.apache.spark.shuffle.hash.hashshufflewriter.revertwrites(hashshufflewriter.scala:</code><code>117</code><code>)</code>

<code>        </code><code>at org.apache.spark.shuffle.hash.hashshufflewriter.stop(hashshufflewriter.scala:</code><code>89</code><code>)</code>

<code>        </code><code>at org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:</code><code>73</code><code>)</code>

<code>        </code><code>at org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:</code><code>41</code><code>)</code>

<code>        </code><code>at org.apache.spark.scheduler.task.run(task.scala:</code><code>54</code><code>)</code>

<code>        </code><code>at org.apache.spark.executor.executor$taskrunner.run(executor.scala:</code><code>177</code><code>)</code>

<code>        </code><code>at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:</code><code>1145</code><code>)</code>

<code>        </code><code>at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:</code><code>615</code><code>)</code>

<code>        </code><code>at java.lang.thread.run(thread.java:</code><code>724</code><code>)</code>

    出问题的代码块(scala)

     一般spark job很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高导致的问题,因此尝试通过配置参数的方法来解决。

     此参数控制spark中通信消息的最大容量 (如task的输出结果),默认为10m。当处理大数据时,task的输出可能会大于这个值,需要根据实际数据设置一个更高的值。尝试将此参数设置成100m后,问题未能解决。

     spark默认的shuffle采用hash模式,在hash模式下,每一次shuffle会生成m*r的数量的文件(m指的是map的数目,r指的是reduce的数目),而当map和reduce的数目开得较大时,会产生相当规模的文件,与此同时带来了大量的内存开销。

     为了降低系统资源,可以采用sort模式,sort模式只产生m数量的文件。具体可以参考:sort-based shuffle之初体验

     在我们的应用场景下,采用sort模式后,shuffle时间比之前增大了1/3,但是问题依旧未解决。

     executor堆外内存设置。起初是1024m,未能跑过,后改为4096m,job就能跑通,原因是程序使用了大量的堆外内存。

继续阅读