[toc]
自我介绍
面试官您好,我拥有近10年的大数据开发与企业级数仓建设经验,先后就职于去哪儿网、美团和微软担任核心开发,专注于离线与实时数仓建设和大规模数据性能优化。
项目经历方面呢,在去哪儿网期间,完整参与了服务平台的数据仓库的搭建,独立负责机票预定、支付、退改签核心域的ETL开发与数据建模,完成了日均 5 亿条数据的100多个离线任务的开发,搭建了覆盖30多张核心报表的业务流程报表体系,并且通过自动化数据质量校验将核心指标错误率从15%降至1%以下。
在美团期间,作为用户和订单两大核心域的主力开发。独立完成了核心事实表和维度表的重构,统一了 80 多个业务指标口径;并且通过数据倾斜治理、分区裁剪和性能调优,将核心订单报表产出时间从 T+2 小时缩短至 T+45 分钟,任务失败率下降65%。同时参与了实时数仓早期建设,用 Flink 开发了多个核心实时监控指标。
在微软期间,负责MSN广告数据处理。主导了从 Scope SQL 到 Spark Structured Streaming 的技术栈迁移,将 100 + 个核心离线和准实时任务从微软专有 Scope 平台迁移到开源 Spark 生态,解决了原平台扩展性不足和成本过高的问题。同时负责全链路性能优化,通过执行计划分析、算子重写和资源精细化调优,将原始 Scope SQL 任务的平均运行时间缩短 25%,迁移后的 Spark 任务性能再提升 30%。在存储优化方面,引入 ZSTD 压缩算法和细粒度分区策略,将存储成本降低 35%。此外,搭建了完整的 CI/CD 流水线和自动化数据质量监控体系,实现了任务的自动化部署、测试和告警,将任务上线周期从 3 天缩短至 4 小时,数据异常发现时间从小时级提升至分钟级。处理日均PB级多地区数据,通过ZSTD压缩、细粒度分区和Spark参数调优,将日志处理任务整体运行时间缩短30%,存储成本降低35%。
技术上,我精通 Hive、Spark、Flink、Kafka 等大数据核心技术栈,尤其擅长:
- 企业级数仓架构设计与维度建模,具备从 0 到 1 搭建离线和实时数仓的完整经验
- PB 级大规模数据性能调优,精通 Spark/Flink 底层原理和调优方法
- 数据工程化落地,熟练掌握 CI/CD、容器化和自动化运维技术
- 数据质量治理,具备搭建全链路数据质量监控体系的经验
以上就是我的自我介绍。
项目深挖
你在美团负责用户和订单域的数仓重构,统一了 80 多个业务指标口径。请具体说明:当时指标口径混乱到了什么程度?举一个最典型的例子;你是如何推动统一指标口径的?具体做了哪些工作;统一口径之后,给业务带来了什么具体的价值?
- 混乱程度: 当时就一个order_count,因为不同的计算逻辑,在不同事实表中有着超过10种的口径,导致当时下游使用报表字段的时候需要频繁确认,但依然避免不了使用错误导致的数据不一致问题。导致了我们不得不耗费大量精力去排查这些问题。
- 推动过程:
- 首先,成立了一个数据指标治理小组,成员包括了数据团队、产品、运营和财务的核心人员
- 然后,制定了统一的指标命名规范和计算逻辑规范,对于不同的统计逻辑,分别明确不同的指标定义,比如 “有效订单” 、 “支付订单”、“完成订单”等
- 接着,在数仓层面下线了所有非标准的订单表,只在DWS层保留了一个统一的订单事实表,所有下游报表的开发必须基于这个基础表。
- 最后,搭建了元数据管理平台,将所有指标的定义、计算逻辑、负责人都录入平台,并且建立了指标审批流程
- 具体价值:
- 统一口径后,数据问题的工单下降了80%,为团队每个月节省了超过200小时的问题排查时间
- 规范化口径后,再也没有出现因为口径不一致导致的业务纠纷
- 下游开发可以更方便基于元数据管理平台获取所需指标,避免了找不到指标负责人的问题,并且最终数据的可信度也大大提升
你通过数据倾斜治理、分区裁剪和 Spark SQL 调优,将核心订单报表的产出时间从 T+2 小时缩短至 T+45 分钟。请具体说明:这个核心订单报表的计算逻辑是什么?它依赖了哪些表?总数据量有多大;你是如何定位到性能瓶颈的?具体用了哪些工具和方法;请详细说明你做的每一项优化,以及每一项优化分别带来了多少性能提升;优化过程中遇到的最大的技术挑战是什么?你是如何解决的?
- 报表详情:这个核心报表是全平台活动效果分析表,计算逻辑是统计每个活动在不同用户标签、不同地区、不同时间段的曝光量、点击量、下单量、支付量及转化率。它依赖了5张核心表:活动维表、用户标签维表(拉链表)、曝光日志表、点击日志表、订单事实表,总数据量约每日3TB。
- 瓶颈定位方法:
- 通过Spark Web UI 的 DAG 图,发现整个作业有3个shuffle阶段,其中第二个shuffle阶段的运行时间占了总时间的70%
- 然后查看 Task 的执行时间,发现有 10 个 Task 的运行时间超过了 1 小时,而其他 Task 只需要 10 分钟左右,确认存在严重的数据倾斜
- 最后通过 spark.eventLog.enabled 生成的事件日志,统计出倾斜最严重的 10 个活动 ID,它们的订单量占了总订单量的30%
- 优化措施及效果
- 前置过滤优化: 在SQL最外层过滤掉了不参与活动的订单和无效的曝光日志,减少了 60% 的输入数据量,运行时间缩短了30分钟
- 分桶优化:将订单事实表和点击日志表按
活动ID进行分桶,分桶数设置为2000,避免了shuffle阶段的数据重分区,运行时间缩短了20分钟 - MapJoin优化:将活动维表和用户标签维表广播到所有 executor,在 map 端进行 join,消除了两个shuffle阶段,运行时间缩短了15分钟
- 数据倾斜治理:将倾斜最严重的10个 活动ID 单独拆分出来,使用加盐法进行处理,运行时间缩短了10分钟
- 资源调优:将 executor 的数量从 100 增加到 200, 每个 executor 的 core 数从 2 增加到 4,运行时间缩短了5分钟
- 最大技术挑战:最大的挑战是用户标签维表太大,无法直接进行MapJoin。用户标签维表有2亿条数据
用户标签维表是怎么设计的,结构是什么样的,为什么这么设计,如何更新
那你在做美团数仓时,Spark 任务数据倾斜是非常高频的问题,你当时遇到过哪些典型的倾斜场景?分别是怎么解决的?给我讲两个最典型的,要结合外卖业务场景。
在美团外卖这个项目里,你负责了活动、订单、骑手三大主题域,那在 DWD 层做维度建模时,你是怎么区分事实表和维度表的?
针对骑手这种会频繁变化的维度信息,你具体是怎么设计拉链表的?用的是全量拉链还是增量拉链?
问题 3-1:你在微软负责 广告日志处理,原来的日志处理任务是怎么设计的?存在哪些具体的性能问题?
问题 3-2:为什么选择 ZSTD 压缩而不是 Snappy 或 Gzip?它带来了多少压缩比提升和性能提升?
最终选择 ZSTD 压缩,是经过了严格的性能测试和对比的,三种压缩算法的对比如下:
- Snappy:压缩速度最快,但压缩比最低(约2:1),适合对速度要求极高,但对存储不敏感的场景
- Gzip:压缩比最高(约4.5:1),但解压速度最慢,CPU开销大,适合冷数据存储
- ZSTD:压缩比与Gzip相当(约4:1),但压缩速度是Gzip的3-5倍,CPU开销低,同时支持分块压缩和随机访问,非常适合热数据处理
除了技术优势外,微软的Cosmos平台对ZSTD有原生的优化支持,能够进一步提升性能
- 压缩比从原来的 2:1 提升到了 4:1,存储成本直接降了50%
- 由于数据量减少了一半,网络传输和磁盘IO时间也相对减少,任务整体运行时间缩短了15%
- 同时,ZSTD的分块压缩特性使得我们可以直接读取数据的一部分,而不需要解压整个文件,大大提升了数据查询效率
问题 3-3:细粒度分区是怎么设计的?原来的分区粒度是什么?为什么要改成现在的粒度?
问题 3-4:你提到的 Spark 参数调优中,哪一个参数的调整带来了最大的性能提升?为什么?请具体说明这个参数的作用,以及你是如何确定调整到什么值的。
在所有Spark参数调优中,spark.shuffle.file.buffer 这个参数的调整带来了最大的性能提升,约占总提升的25%。
参数作用:这个参数控制的是shuffle写过程中每个map任务的输出缓冲区大小,默认是32KB。当缓冲区满了之后,才会将数据溢写到磁盘上
为什么影响最大:我们的日志处理任务是典型的IO密集型任务,shuffle过程中会产生大量的磁盘IO。默认的32KB缓冲区太小,导致每个map任务会频繁地溢写磁盘,产生大量的小文件,严重影响了性能。
如何确定调整值:
- 首先通过Spark Web UI 查看shuffle阶段的磁盘IO统计,发现每个map任务平均溢写了10次以上
- 然后进行了梯度测试,分别将参数调整为64KB、128KB、256KB、512KB
- 测试结果显示,当调整到128KB时,溢写次数减少到了2次,性能提升最明显,继续增大到256KB时,性能提升不明显,反而增加了内存开销
- 最终这个参数设置为128KB
实际效果:shuffle阶段的磁盘IO减少了80%,任务整体运行时间缩短了15%
核心技术深度
问题1: 请详细说明 Spark 3.x 的统一内存管理机制。什么是存储内存和执行内存?它们之间是如何动态调整的?如果一个Spark 任务出现 executor OOM,你会按照什么步骤进行排查?
Spark 3.x 采用的是统一内存管理机制,将堆内存划分为四个部分:
-
预留内存: 300MB,用于Spark内部使用,用户无法配置
-
用户内存:占总内存的40%,
-
正确计算方式:
可用内存 = 总内存 - 预留内存 (300MB)
用户内存 = 可用内存 × (1 -
spark.memory.fraction)默认
spark.memory.fraction=0.6,所以用户内存默认是可用内存的 40%用途:用户自定义数据结构、RDD 依赖、Spark 内部元数据和防止 OOM 的安全缓冲
-
-
统一内存:占总内存的60%
正确计算方式:
统一内存 = 可用内存 ×
spark.memory.fraction 默认是可用内存的 60%
两者确实可以动态占用对方的空闲内存,但有严格的优先级和限制。
-
没有单独的 “其他内存” 区域。线程栈、JVM 元数据等都包含在用户内存中,是用户内存的一部分
存储内存和执行内存的动态调整规则
- 初始时,存储内存和执行内存各占统一内存的50%
- 当任何一方有空闲内存时,另一方都可以抢占
- 但当被抢占的一方内存不足时,还回内存的逻辑是不同的
- 对于执行内存不足时,可以直接驱逐存储内存超过它本身配置的比例的部分(即,默认情况下超过50%的部分),这部分被驱逐的内存,Spark会释放缓存中最近最少使用(LRU)的数据块,将其溢写到磁盘
- 对于存储内存不足时,只能等待执行内存释放,不能驱逐被执行内存占用的部分
Executor OOM排查步骤:
- 第一步:确定OOM类型
- 查看任务日志,确定是
java.lang.OutOfMemoryError: Java heap space还是java.lang.OutOfMemoryError: Direct buffer memory - 区分是driver OOM 还是 executor OOM
- 查看任务日志,确定是
- 第二步: 查看Spark Web UI
- 查看 Executors页面,观察每个executor内存的使用情况和GC时间
- 查看Storage页面,观察缓存的数据量和大小
- 查看Stages页面,观察每个stage的shuffle数据量和数据倾斜情况
- 第三步:分析常见原因
- 数据倾斜:某个executor处理的数据量远大于其他executor,这是最常见的原因
- shuffle数据量过大:shuffle 阶段产生的数据量超过了executor的内存
- 大对象:代码中创建了过大的对象,比如一次性collect()大量数据到driver
- 内存配置不合理:executor内存太小,或者存储内存占比太高
- 第四步:针对性解决
- 数据倾斜:使用加盐法、拆分倾斜Key等方法
- shuffle优化:调整shuffle参数,使用ZSTD压缩
- 内存配置:增加executor内存,调整spark.memory.fraction参数
- 代码优化:避免在driver端处理大量数据,使用广播变量代替大表join
问题2: Flink的CheckPoint机制是如何实现Exactly-Once语义的?什么是两阶段提交(2PC)?Flink的两阶段提交Sink是怎么工作的?
Flink的Exactly-Once语义氛围内部Exactly-Once和端到端Exactly-Once:
- 内部Exactly-Once通过Checkpoint机制实现
- 端到端Exactly-Once需要结合Checkpoint机制和两阶段提交(2PC)Sink实现
Checkpoint实现内部Exactly-Once的原理
Checkpoint的核心是barrier对齐和状态持久化:
- JobManager会定期向所有Source算子发送Checkpoint barrier
- 当 Source 算子收到 barrier 后,会暂停数据处理,将自己的状态持久化到状态后端,然后barrier发送给下游算子
- 当一个算子收到所有上游算子的barrier后,会进行barrier对齐,然后将自己的状态持久化到状态后端,再将barrier发送给下游
- 当所有算子都完成了状态持久化后,JobManager会标记这个checkpoint为成功
- 如果作业失败,Flink会从最近一次成功的Checkpoint恢复状态,重新处理数据,保证数据不会丢失也不会重复
问题:
- 如果在barrier对齐阶段,上游某一个算子保存checkpoint-1失败了,checkpoint-2发送过来了,在下游算子,checkpoint-2对齐了,具体的处理是怎样的
- 对于在barrier对齐时导致的数据反压,flink是怎么处理的
- 什么叫端到端的exectly-once
两阶段提交
两阶段提交是一种分布式事务协议,用于保证分布式系统中多个节点的数据一致性。它分为两个阶段:
- 预提交阶段:协调者向所有参与者发送预提交请求,参与者执行事务操作,并将结果返回给协调者
- 正式提交阶段:如果所有参与者都预提交成功,协调者向所有参与者发送正式提交请求,参与者提交事务;如果任何一个参与者预提交失败,协调者向所有参与者发送回滚请求,参与者回滚事务。(预提交阶段,事务就已经完成了99%,只差让修改可见,所以正式提交,仅仅是让修改可见,如果在这个阶段失败的话,参考xxx)
Flink的两阶段提交Sink工作流程
Flink的TwoPhaseCommitSinkFunction 实现了两阶段提交,它将Checkpoint和事务提交结合起来:
- 预提交阶段:当算子收到Checkpoint Barrier后,会开启一个事务,将当前批次的数据写入外部系统,但不提交事务。然后将事务ID保存到状态中,进行Checkpoint。(注意:预提交输出的数据,是当前这批数据需要Sink输出的处理结果)
- 正式提交阶段:当JobManager确认所有算子都完成了Checkpoint后,会向所有算子发送Checkpoint完成的通知。算子收到通知后,提交之前预提交的事务。
- 回滚阶段:如果checkpoint失败,算子会回滚之前预提交的事务
通过这种方式,Flink保证了只有当Checkpoint成功时,数据才会被提交到外部系统,从而实现了端到端的Exactly-Once语义
问题:
- Flink两阶段提交的具体实现是在哪里
- Flink的回滚,是只删除外部系统中之前写入的checkpoint,但是正常的处理过程不会受影响,是吗,然后会在下一个barrier重新开启一个事务是吗?
- 状态中保存事务ID的作用是什么
- 如果失败回滚了,本来可能是1分钟统计一次结果的,按照分钟作为目录输出,会不会导致失败的那分钟,没有文件,都写到下一分钟去了?
问题3: 在Hive和Spark SQL中,数据倾斜的根本原因是什么?请分别说明group by倾斜和join倾斜的解决办法,并说明每种方法的适用场景。
数据倾斜的根本原因是shuffle阶段相同key被分配到同一个reducer,而有些key的数据量要远远大于其他key,导致部分reducer处理数据量过大,而执行时间长,甚至失败
group by倾斜的解决方法和适用场景
| 解决方法 | 原理 | 适用场景 |
|---|---|---|
| Map 端预聚合 | 在 map 端先进行一次局部聚合,减少 shuffle 到 reducer 端的数据量 | 大多数 group by 场景,尤其是聚合函数是 sum、count 等可累加的情况 |
| 加盐法 | 给倾斜的 key 加上随机前缀,分散到多个 reducer 进行局部聚合,然后去掉前缀再进行全局聚合 | 少数几个 key 数据量特别大的场景 |
| 过滤无效 key | 提前过滤掉 null 值、空字符串等无效 key | 倾斜是由大量无效 key 导致的场景 |
join倾斜的解决方法及适用场景
| 解决方法 | 原理 | 适用场景 |
|---|---|---|
| MapJoin | 将小表广播到所有 executor,在 map 端进行 join,避免 shuffle | 其中一个表是小表(小于 1GB)的场景 |
| 拆分热点 key | 将倾斜的 key 单独拿出来处理,然后和其他 key 的结果合并 | 热点 key 数量很少(少于 10 个)的场景 |
| 加盐法 | 给两个表的倾斜 key 都加上随机前缀,分散到多个 reducer 进行 join | 两个都是大表,且热点 key 数量较多的场景 |
| 动态分区法 | 先统计出倾斜的 key,然后将这些 key 单独分到一个分区,其他 key 分到其他分区,分别进行 join | 热点 key 数量较多,但可以提前统计出来的场景 |
| BloomFilter 过滤 | 用 BloomFilter 过滤掉其中一个表中不存在的 key,减少 join 的数据量 | 其中一个表的 key 数量远少于另一个表的场景 |
在Hive和Spark SQL中,这些方法的实现方式略有不同,但原理是一样的。比如Spark SQL可以通过broadcast()函数实现MapJoin,而Hive可以通过设置hive.auto.convert.join=true参数自动开启MapJoin
问题:
- 如何在程序中自动识别热点key,而不用每次出现问题单独处理,处理方式可能会遇到什么问题?
- mapJoin还有哪些配置和需要注意的地方,有没有什么坑
细粒度分区策略是怎么降低存储的
场景设计题
如果让你设计一个电商平台的订单实时数仓,要求支持秒级的订单指标查询,并且保证数据的一致性和准确性。请说明你的架构设计,包括数据流向、分层设计、技术选型和关键技术点
我会采用Kappa架构来设计这个电商平台的订单实时数仓,因为Kappa架构更加简单,易于维护,能够很好地满足秒级查询和数据一致性的要求
整体数据流向
1 | 业务数据库(MySQL) -> Canal -> Kafka(ODS层) -> Flink -> Kafka(DWD层) -> Flink -> Doris(DWS/ADS层) -> 业务应用 |
分层设计
- ODS层:原始数据层,存储从Canal同步过来的订单binlog数据,保留原始的字段和格式。数据按天+小时分区存储在Kafka中,保留7天
- DWD层:数据明细层,对ODS层的数据进行清洗、过滤、脱敏和格式转换,生成统一的订单明细事实表。采用星型模型,以订单事实表为中心、关联用户、商品、商家等维度表
- DWS层:数据汇总层,按照不同的维度(用户、商品、商家、时间等)对DWD层的数据进行预聚合,生成各种宽表。比如订单按天汇总表、订单按商家汇总表等
- ADS层:数据应用层,根据业务需求,从DWS层的数据生成最终的报表和指标。比如实时订单量、实时交易额、实时转化率等
技术选型
- 数据采集:Canal,用于实时同步MySQL的binlog数据
- 消息队列:Kafka,用于解耦数据采集和数据处理,提高吞吐量和高可靠性
- 实时计算:Flink,用于数据清洗、转换和聚合,提供Exactly-Once语义保证
- 数据存储:Doris,用于存储DWS层和ADS层的数据,提供秒级的查询性能和事务支持
- 数据可视化:Grafana,用于展示实时报表和指标,还有报警监控
关键技术点
- 数据一致性保证:
- 使用Flink的Exactly-Once语义,保证数据在Flink内部不会重复也不会丢失
- 在Doris端使用幂等写入和事务支持,保证数据写入的一致性
- 对于订单状态变更的情况,使用Doris的upsert操作,保证最新的状态覆盖旧的状态
- 性能优化:
- 对于Doris的表进行合理的分区和分桶,提高查询性能
- 在Flink端进行预聚合,减少写入Doris的数据量
- 使用Drios的物化视图,提前计算好常用的指标
- 容错机制
- Kafka开启3副本机制,保证数据不丢
- Flink开启Checkpoint,每30秒保存一次作业状态
- Doris开启3副本机制,保证数据的高可用
- 监控告警:
- 监控Kafka的消息堆积量和消费延迟
- 监控Flink作业的运行状态和Checkpoint成功率
- 监控Doris的查询延迟和写入延迟
问题
- 有哪些常用的架构,优缺点,适用场景
- 现在mysql同步工具最好的是不是Flink?比较之下优点有哪些