生而为人

程序员的自我修养

0%

参数优化

触发spark合并小文件的阈值(1M)

SET spark.sql.mergeSmallFileSize=104857600;

spark shuffle默认分区数

set spark.sql.shuffle.partitions=3000;
set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.minExecutors=100;
set spark.dynamicAllocation.maxExecutors=1000;
set spark.speculation.multiplier=1.5;
set spark.speculation.quantile=0.85;
set spark.driver.memory=8g;
set spark.executor.memory=13G;

2 详细内容

2.1 参数

2.1.1 资源参数
托管平台参数名 xt平台参数 含义 建议
num-executors set spark.dynamicAllocation.enabled=true;set spark.dynamicAllocation.minExecutors=10;set spark.dynamicAllocation.maxExecutors=200; 决定作业会向yarn申请多少executor,如果设置过少会导致并行度不够,过多会导致浪费资源或pending 根据数据量,以及设置的并行度(最大最慢的stage的并行度)和executor-cores来决定设置,建议根据瓶颈的stage的并行度来决定申请资源,一般使并行度=(num-executors * executor-cores)*(2~3)
executor-memory set spark.executor.memory=2g; 缓存数据、代码执行的堆内存以及JVM运行时需要的内存,堆内+对外=一个container内存大小。spark任务并不是内存申请越大执行越快,而是合理利用内存之后才会变快。 根据处理的数据量来申请,或者debug过程中,oom的情况,建议调大参数,考虑到队列资源情况,可以1G为粒度向上加,直至解决问题
set spark.yarn.executor.memoryOverhead=1024; 堆外内存,直接向系统申请,如数据传输时的netty等。 如果shuffle数据量比较大,或者排序逻辑很复杂时可以调大参数
executor-cores set spark.executor.cores=5; 决定了每个executor进程并行执行task线程的能力。因为每个cu同一时间只能执行一个task线程,与并行度相关很大。 根据 并行度=(num-executors * executor-cores)*(2~3)
driver-memory set spark.driver.memory=4g; driver的内存,如果有collect操作,或者在driver有复杂计算,或者executor num和task num很大可以多申请些。 一般4g足够了,如果executor和并行度很大,建议设置高一些,避免oom
spark.memory.fraction set spark.memory.fraction=0.6 storage和execution相加占JVM内存占比,默认0.6,剩余0.4用于用户数据结构以及spark元数据(整体内存还有约300M系统预留) 建议根据缓存数据,自定义数据结构调整比例。
spark.memory.storageFraction set spark.memory.storageFraction=0.5 storage和execution内存比例,由于spark2.0后这部分内存动态占用机制,所以这块基本可以忽略 建议忽略
spark.network.timeout set spark.network.timeout=480s; 由网络或者gc引起,worker或executor没有接收到executor或task的心跳反馈。 适当提高可以提升稳定性,shuffle很重(shuffle read,write数据量很大)的场景下可以提升点
spark.task.maxFailures set spark.task.maxFailures=6; 单个task失败次数上限,触发后job会失败 适当提高可以提升稳定性,一般不需要调整
spark.speculation set spark.speculation=true 推测执行的开关, 默认是true,推测执行的各个参数调整都会使得任务处于多申请资源,无法避免慢节点的情况,因此调整需要尽量保证测试效果。 因为是分布式计算引擎,task分散在各个服务器上运行,而每个服务器又承载不止一个任务,因为大体量的集群下,慢节点(磁盘io满,网络io满,cpu满)这种情况几乎是无法避免的,因此出现个别task在慢节点执行时,需要推测执行(将相同的task分发到别的节点去执行,避免慢节点拖垮任务)
spark.speculation.interval set spark.speculation.interval=1000ms 检查task是否需要推测执行的时间间隔, 默认1000ms
spark.speculation.multiplier set spark.speculation.multiplier=3 当task的时间超过已完成task执行时间中位数的几倍后, 才对这个task开启推测执行, 默认是3倍 谨慎调整,如果任务平均task时间都很快,且任务很重要,可以适当调低这个值,更快的发起推测执行,尽量避免慢节点
spark.speculation.quantile set spark.speculation.quantile=0.99 当一个stage中有多少task已经完成后才开启推测执行, 默认0.75(75%) 谨慎调整,如果任务平均task时间都很快,且任务很重要,可以适当调低这个值,更大范围的发起推测执行,尽量避免慢节点
2.2.2 读写文件
参数 含义 建议
set hive.exec.orc.split.strategy=HYBRID; 读取ORC表时生成split的策略//BI策略以文件为粒度进行split划分;//ETL策略会将文件进行切分,多个stripe组成一个split;//HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。//对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。//对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。//不想麻烦的可以直接使用HYBRID策略//平台默认应该是HYBRID,个人感觉一般情况下不需要调整 默认值即可,但是如果读取文件数特别大,大小也特别大时需要考虑切分策略
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=134217728 如果使用ETL策略读取orc文件,设置以上参数可以完成对小文件的合并效果~
set spark.hadoopRDD.targetBytesInPartition=134217728; //Spark在读取hive表时,默认会为每个文件创建一个task,如果一个SQL没有shuffle类型的算子,每个task执行完都会产生一个文件写回HDFS,这样就潜在存在小文件问题。//该参数可以将多个文件放到一个task中处理,默认为33554432,即如果一个文件和另一个文件大小之和小于32M,就会被放到一个task钟处理。适当提高该值,可以降低调度压力,避免无shuffle作业产生 读取文件慢的时候,可以调小
set spark.sql.mergeSmallFileSize=134217728; //如果最终写入分区目录下文件平均大小小于参数,会再启动一个map only的stage做合并,强烈建议设置该参数
2.2.3 shuffle参数
参数 spark-sql 含义 建议
spark.sql.shuffle.partitions set spark.sql.shuffle.partitions=800; 每个stage的默认task数量,也就是默认并行度。在shuffle操作时从mapper端写出的partition个数,平台默认2000 可以根据申请的executor和cores数来决定,建议代码中会对瓶颈的stage对rdd做repartition操作,
spark.sql.adaptive.enabled set spark.sql.adaptive.enabled=true 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行, 类似于repartition操作只不过是自动化的
spark.sql.adaptive.shuffle.targetPostShuffleInputSize set spark.sql.adaptive.shuffle.targetPostShuffleInputSize 开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer 同上
spark.sql.adaptive.minNumPostShufflePartitions set spark.sql.adaptive.minNumPostShufflePartitions 开启spark.sql.adaptive.enabled后,最小的分区数 同上
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 当几个stripe的大小大于该值时,会合并到一个task中处理 同上
spark.shuffle.compress set spark.shuffle.compress=true; shuffle过程是否压缩 建议开启
spark.io.compression.codec set spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec; 压缩的编码格式 建议默认值即可,这块对性能提升影响不是很大
spark.io.compression.snappy.block.size set spark.io.compression.snappy.block.size=65536; 压缩时每个block的size 建议默认值即可,这块对性能提升影响不是很大
spark.shuffle.manager=rss; set spark.shuffle.manager=rss; 使用平台的rss服务,用于超大shuffle作业的性能优化
2.2.3 其他

2.2 日志界面介绍与问题定位

http://spark-his.data.sankuai.com/history/application_1600078095841_2155359/1/stages/

模块 介绍
job 作业的基本job信息
Stages 作业划分各个stage的完成情况与并行度,以及task的执行情况
Storage 缓存的数据情况,比如cache操作
Environment 环境变量以及参数的设置
Executors 申请到的executor与task执行情况,日志,driver日志,以及堆栈
SQL 任务的dag图,以及Logical Plan ,Physical Plan等
Debug 任务资源申请的metrics

2.3 问题解决思路

ETL的优化尽量按照 分析日志 —-> 找到问题点 —-> 思考如何解决问题点 的模式进行,解决瓶颈问题需要靠设置参数,改部分逻辑联动进行,才能得到最大收益,因此一定要找到瓶颈问题对症下药。

任务优化举例:https://km.sankuai.com/page/469785434

2.3.1 oom

内存不足需要看任务执行到哪个stage,对应sql的哪块逻辑,是否有数据倾斜。

调整的话调大spark.executor.memory一般都可以解决。

2.3.2 数据倾斜

https://km.sankuai.com/page/150274845

2.3.3 资源不足

https://km.sankuai.com/page/345269968

3 一些开发上的建议

ETL:

尽量根据作业处理数据量合理设置 至少是(set spark.dynamicAllocation.maxExecutors=200; set spark.executor.memory=2g; set spark.executor.cores=5; set spark.sql.shuffle.partitions=800;)这四个参数;

读取hive表时,尽量加上分区的限制,无论是左表还是右表;

关联时,小表写在右边,尽量对小表进行cache处理;

关联时,尽量把右表的限制条件写在子查询里;

关联时,尽量避免笛卡尔积,实在无法避免的情况下,请做好分区扩张(调大spark.sql.shuffle.partitiions);

对于sql中多次使用的数据,尽量做cache处理;

使用grouping set的时,尽量做到输入最小的数据,最少的字段,尽可能通过设计数据结构避免count distinct。

RDD:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1,避免创建重复的rdd
避免以下操作
val rdd1 = sqlContext.sql("select A.a,A.b from A").rdd
rdd1.map()
val rdd2 = sqlContext.sql("select A.a,A.b from A").rdd
rdd2.reduce()
2,尽量复用同一个rdd
val rdd1 = sqlContext.sql("select A.a,A.b from A").rdd
rdd1.map(_._1).map(...)...
rdd1.filter(_._1 > 5).map(...)...
避免以下操作
val rdd2 = sqlContext.sql("select A.a,A.b from A where A.a > 5").rdd.map(...)
val rdd2 = rdd1.map(_._1)
rdd2.map(...)
3,对多次使用的rdd,尽量进行持久化
rdd.persist(storageLevel)
4,尽量避免shuffle算子,如无法避免,使用map端预聚合的shuffle算子
使用reduceByKey,aggregateByKey 替换 groupByKey
5,使用高性能算子
使用mapPartition代替map:
使用mapPartitions时,函数会一次接受一个partition的数据中计算,性能会高。
使用foreachPartitions代替foreach:
原理类似mapPartitions,在批量插入数据(mysql等)时优势明显,因为避免了每条数据都调用数据库连接。
rdd在filter后根据过滤数据量做coalesce/repartition:
6,大变量用广播
val map = Map(1 -> "a")
val mapBc = sc.broadcast(map)
避免直接在map或reduce算子中使用driver声明的大变量

4 附录

其实平台已经对spark文档做过很详细的整理了,这里附一些链接,希望能帮到大家,日常大家在优化过程中有问题也可以找zhaoxu05或者duntianlun,我们一起讨论,共同解决问题,共同学习:

总目录: https://km.sankuai.com/page/69786825

Spark UI : https://km.sankuai.com/page/237124654

常见问题:https://km.sankuai.com/page/50309540

待整理config

  1. spark.sql.shuffle.partitions