触发spark合并小文件的阈值(1M)
SET spark.sql.mergeSmallFileSize=104857600;
spark shuffle默认分区数
set spark.sql.shuffle.partitions=3000;
set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.minExecutors=100;
set spark.dynamicAllocation.maxExecutors=1000;
set spark.speculation.multiplier=1.5;
set spark.speculation.quantile=0.85;
set spark.driver.memory=8g;
set spark.executor.memory=13G;
2 详细内容
2.1 参数
2.1.1 资源参数
| 托管平台参数名 | xt平台参数 | 含义 | 建议 |
|---|---|---|---|
| num-executors | set spark.dynamicAllocation.enabled=true;set spark.dynamicAllocation.minExecutors=10;set spark.dynamicAllocation.maxExecutors=200; | 决定作业会向yarn申请多少executor,如果设置过少会导致并行度不够,过多会导致浪费资源或pending | 根据数据量,以及设置的并行度(最大最慢的stage的并行度)和executor-cores来决定设置,建议根据瓶颈的stage的并行度来决定申请资源,一般使并行度=(num-executors * executor-cores)*(2~3) |
| executor-memory | set spark.executor.memory=2g; | 缓存数据、代码执行的堆内存以及JVM运行时需要的内存,堆内+对外=一个container内存大小。spark任务并不是内存申请越大执行越快,而是合理利用内存之后才会变快。 | 根据处理的数据量来申请,或者debug过程中,oom的情况,建议调大参数,考虑到队列资源情况,可以1G为粒度向上加,直至解决问题 |
| set spark.yarn.executor.memoryOverhead=1024; | 堆外内存,直接向系统申请,如数据传输时的netty等。 | 如果shuffle数据量比较大,或者排序逻辑很复杂时可以调大参数 | |
| executor-cores | set spark.executor.cores=5; | 决定了每个executor进程并行执行task线程的能力。因为每个cu同一时间只能执行一个task线程,与并行度相关很大。 | 根据 并行度=(num-executors * executor-cores)*(2~3) |
| driver-memory | set spark.driver.memory=4g; | driver的内存,如果有collect操作,或者在driver有复杂计算,或者executor num和task num很大可以多申请些。 | 一般4g足够了,如果executor和并行度很大,建议设置高一些,避免oom |
| spark.memory.fraction | set spark.memory.fraction=0.6 | storage和execution相加占JVM内存占比,默认0.6,剩余0.4用于用户数据结构以及spark元数据(整体内存还有约300M系统预留) | 建议根据缓存数据,自定义数据结构调整比例。 |
| spark.memory.storageFraction | set spark.memory.storageFraction=0.5 | storage和execution内存比例,由于spark2.0后这部分内存动态占用机制,所以这块基本可以忽略 | 建议忽略 |
| spark.network.timeout | set spark.network.timeout=480s; | 由网络或者gc引起,worker或executor没有接收到executor或task的心跳反馈。 | 适当提高可以提升稳定性,shuffle很重(shuffle read,write数据量很大)的场景下可以提升点 |
| spark.task.maxFailures | set spark.task.maxFailures=6; | 单个task失败次数上限,触发后job会失败 | 适当提高可以提升稳定性,一般不需要调整 |
| spark.speculation | set spark.speculation=true | 推测执行的开关, 默认是true,推测执行的各个参数调整都会使得任务处于多申请资源,无法避免慢节点的情况,因此调整需要尽量保证测试效果。 | 因为是分布式计算引擎,task分散在各个服务器上运行,而每个服务器又承载不止一个任务,因为大体量的集群下,慢节点(磁盘io满,网络io满,cpu满)这种情况几乎是无法避免的,因此出现个别task在慢节点执行时,需要推测执行(将相同的task分发到别的节点去执行,避免慢节点拖垮任务) |
| spark.speculation.interval | set spark.speculation.interval=1000ms | 检查task是否需要推测执行的时间间隔, 默认1000ms | |
| spark.speculation.multiplier | set spark.speculation.multiplier=3 | 当task的时间超过已完成task执行时间中位数的几倍后, 才对这个task开启推测执行, 默认是3倍 | 谨慎调整,如果任务平均task时间都很快,且任务很重要,可以适当调低这个值,更快的发起推测执行,尽量避免慢节点 |
| spark.speculation.quantile | set spark.speculation.quantile=0.99 | 当一个stage中有多少task已经完成后才开启推测执行, 默认0.75(75%) | 谨慎调整,如果任务平均task时间都很快,且任务很重要,可以适当调低这个值,更大范围的发起推测执行,尽量避免慢节点 |
2.2.2 读写文件
| 参数 | 含义 | 建议 |
|---|---|---|
| set hive.exec.orc.split.strategy=HYBRID; | 读取ORC表时生成split的策略//BI策略以文件为粒度进行split划分;//ETL策略会将文件进行切分,多个stripe组成一个split;//HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。//对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。//对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。//不想麻烦的可以直接使用HYBRID策略//平台默认应该是HYBRID,个人感觉一般情况下不需要调整 | 默认值即可,但是如果读取文件数特别大,大小也特别大时需要考虑切分策略 |
| set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=134217728 | 如果使用ETL策略读取orc文件,设置以上参数可以完成对小文件的合并效果~ | |
| set spark.hadoopRDD.targetBytesInPartition=134217728; | //Spark在读取hive表时,默认会为每个文件创建一个task,如果一个SQL没有shuffle类型的算子,每个task执行完都会产生一个文件写回HDFS,这样就潜在存在小文件问题。//该参数可以将多个文件放到一个task中处理,默认为33554432,即如果一个文件和另一个文件大小之和小于32M,就会被放到一个task钟处理。适当提高该值,可以降低调度压力,避免无shuffle作业产生 | 读取文件慢的时候,可以调小 |
| set spark.sql.mergeSmallFileSize=134217728; | //如果最终写入分区目录下文件平均大小小于参数,会再启动一个map only的stage做合并,强烈建议设置该参数 |
2.2.3 shuffle参数
| 参数 | spark-sql | 含义 | 建议 |
|---|---|---|---|
| spark.sql.shuffle.partitions | set spark.sql.shuffle.partitions=800; | 每个stage的默认task数量,也就是默认并行度。在shuffle操作时从mapper端写出的partition个数,平台默认2000 | 可以根据申请的executor和cores数来决定,建议代码中会对瓶颈的stage对rdd做repartition操作, |
| spark.sql.adaptive.enabled | set spark.sql.adaptive.enabled=true | 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行, | 类似于repartition操作只不过是自动化的 |
| spark.sql.adaptive.shuffle.targetPostShuffleInputSize | set spark.sql.adaptive.shuffle.targetPostShuffleInputSize | 开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer | 同上 |
| spark.sql.adaptive.minNumPostShufflePartitions | set spark.sql.adaptive.minNumPostShufflePartitions | 开启spark.sql.adaptive.enabled后,最小的分区数 | 同上 |
| set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize | 当几个stripe的大小大于该值时,会合并到一个task中处理 | 同上 | |
| spark.shuffle.compress | set spark.shuffle.compress=true; | shuffle过程是否压缩 | 建议开启 |
| spark.io.compression.codec | set spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec; | 压缩的编码格式 | 建议默认值即可,这块对性能提升影响不是很大 |
| spark.io.compression.snappy.block.size | set spark.io.compression.snappy.block.size=65536; | 压缩时每个block的size | 建议默认值即可,这块对性能提升影响不是很大 |
| spark.shuffle.manager=rss; | set spark.shuffle.manager=rss; | 使用平台的rss服务,用于超大shuffle作业的性能优化 |
2.2.3 其他
2.2 日志界面介绍与问题定位
http://spark-his.data.sankuai.com/history/application_1600078095841_2155359/1/stages/
| 模块 | 介绍 | |||
|---|---|---|---|---|
| job | 作业的基本job信息 | |||
| Stages | 作业划分各个stage的完成情况与并行度,以及task的执行情况 | |||
| Storage | 缓存的数据情况,比如cache操作 | |||
| Environment | 环境变量以及参数的设置 | |||
| Executors | 申请到的executor与task执行情况,日志,driver日志,以及堆栈 | |||
| SQL | 任务的dag图,以及Logical Plan ,Physical Plan等 | |||
| Debug | 任务资源申请的metrics |
2.3 问题解决思路
ETL的优化尽量按照 分析日志 —-> 找到问题点 —-> 思考如何解决问题点 的模式进行,解决瓶颈问题需要靠设置参数,改部分逻辑联动进行,才能得到最大收益,因此一定要找到瓶颈问题对症下药。
任务优化举例:https://km.sankuai.com/page/469785434
2.3.1 oom
内存不足需要看任务执行到哪个stage,对应sql的哪块逻辑,是否有数据倾斜。
调整的话调大spark.executor.memory一般都可以解决。
2.3.2 数据倾斜
https://km.sankuai.com/page/150274845
2.3.3 资源不足
https://km.sankuai.com/page/345269968
3 一些开发上的建议
ETL:
尽量根据作业处理数据量合理设置 至少是(set spark.dynamicAllocation.maxExecutors=200; set spark.executor.memory=2g; set spark.executor.cores=5; set spark.sql.shuffle.partitions=800;)这四个参数;
读取hive表时,尽量加上分区的限制,无论是左表还是右表;
关联时,小表写在右边,尽量对小表进行cache处理;
关联时,尽量把右表的限制条件写在子查询里;
关联时,尽量避免笛卡尔积,实在无法避免的情况下,请做好分区扩张(调大spark.sql.shuffle.partitiions);
对于sql中多次使用的数据,尽量做cache处理;
使用grouping set的时,尽量做到输入最小的数据,最少的字段,尽可能通过设计数据结构避免count distinct。
RDD:
1 | 1,避免创建重复的rdd |
4 附录
其实平台已经对spark文档做过很详细的整理了,这里附一些链接,希望能帮到大家,日常大家在优化过程中有问题也可以找zhaoxu05或者duntianlun,我们一起讨论,共同解决问题,共同学习:
总目录: https://km.sankuai.com/page/69786825
Spark UI : https://km.sankuai.com/page/237124654
常见问题:https://km.sankuai.com/page/50309540
待整理config
- spark.sql.shuffle.partitions