Spark Shuffle FetchFailedException解决方案

发布时间:2017-3-31 21:55:25编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"Spark Shuffle FetchFailedException解决方案",主要涉及到Spark Shuffle FetchFailedException解决方案方面的内容,对于Spark Shuffle FetchFailedException解决方案感兴趣的同学可以参考一下。

在大规模数据处理中,这是个比较常见的错误。

报错提示

SparkSQL shuffle操作带来的报错

org.apache.spark.shuffle.MetadataFetchFailedException:

Missing an output location for shuffle 0

org.apache.spark.shuffle.FetchFailedException:

Failed to connect to hostname/192.168.xx.xxx:50268

RDD的shuffle操作带来的报错

WARN TaskSetManager: Lost task 17.1 in stage 4.1 (TID 1386, spark050013): java.io.FileNotFoundException: /data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/2f/temp_shuffle_e22e013a-5392-4edb-9874-a196a1dad97c (没有那个文件或目录)

FetchFailed(BlockManagerId(6083b277-119a-49e8-8a49-3539690a2a3f-S155, spark050013, 8533), shuffleId=1, mapId=143, reduceId=3, message=

org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/data04/spark/tmp/blockmgr-817d372f-c359-4a00-96dd-8f6554aa19cd/0e/shuffle_1_143_0.data, offset=997061, length=112503}

原因

         shuffle分为shuffle write和shuffle read两部分。
         shuffle write的分区数由上一阶段的RDD分区数控制,shuffleread的分区数则是由Spark提供的一些参数控制。

         shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

         shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVMcrash也会造成长时间的gc。

解决办法

         知道原因后问题就好解决了,主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。

1.        减少shuffle数据

         思考是否可以使用map side join或是broadcastjoin来规避shuffle的产生。

将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

2.        SparkSQL和DataFrame的join,groupby等操作

         通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。

3.        Rdd的join,groupBy,reduceByKey等操作

         通过spark.default.parallelism控制shuffleread与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

4.        提高executor的内存

         通过spark.executor.memory适当提高executor的memory值。

5.        是否存在数据倾斜的问题

         空值是否已经过滤?异常数据(某个key数据特别大)是否可以单独处理?考虑改变数据分区规则。



上一篇:排序算法:二叉排序树
下一篇:Intellij Idea15 快捷键设置大全

相关文章

相关评论

本站评论功能暂时取消,后续此功能例行通知。

一、不得利用本站危害国家安全、泄露国家秘密,不得侵犯国家社会集体的和公民的合法权益,不得利用本站制作、复制和传播不法有害信息!

二、互相尊重,对自己的言论和行为负责。

好贷网好贷款