生而为人

程序员的自我修养

0%

[toc]

自我介绍

面试官您好,我拥有近10年的大数据开发与企业级数仓建设经验,先后就职于去哪儿网、美团和微软担任核心开发,专注于离线与实时数仓建设和大规模数据性能优化。

项目经历方面呢,在去哪儿网期间,完整参与了服务平台的数据仓库的搭建,独立负责机票预定、支付、退改签核心域的ETL开发与数据建模,完成了日均 5 亿条数据的100多个离线任务的开发,搭建了覆盖30多张核心报表的业务流程报表体系,并且通过自动化数据质量校验将核心指标错误率从15%降至1%以下。

在美团期间,作为用户和订单两大核心域的主力开发。独立完成了核心事实表和维度表的重构,统一了 80 多个业务指标口径;并且通过数据倾斜治理、分区裁剪和性能调优,将核心订单报表产出时间从 T+2 小时缩短至 T+45 分钟,任务失败率下降65%。同时参与了实时数仓早期建设,用 Flink 开发了多个核心实时监控指标。

在微软期间,负责MSN广告数据处理。主导了从 Scope SQL 到 Spark Structured Streaming 的技术栈迁移,将 100 + 个核心离线和准实时任务从微软专有 Scope 平台迁移到开源 Spark 生态,解决了原平台扩展性不足和成本过高的问题。同时负责全链路性能优化,通过执行计划分析、算子重写和资源精细化调优,将原始 Scope SQL 任务的平均运行时间缩短 25%,迁移后的 Spark 任务性能再提升 30%。在存储优化方面,引入 ZSTD 压缩算法和细粒度分区策略,将存储成本降低 35%。此外,搭建了完整的 CI/CD 流水线和自动化数据质量监控体系,实现了任务的自动化部署、测试和告警,将任务上线周期从 3 天缩短至 4 小时,数据异常发现时间从小时级提升至分钟级。处理日均PB级多地区数据,通过ZSTD压缩、细粒度分区和Spark参数调优,将日志处理任务整体运行时间缩短30%,存储成本降低35%。

技术上,我精通 Hive、Spark、Flink、Kafka 等大数据核心技术栈,尤其擅长:

  • 企业级数仓架构设计与维度建模,具备从 0 到 1 搭建离线和实时数仓的完整经验
  • PB 级大规模数据性能调优,精通 Spark/Flink 底层原理和调优方法
  • 数据工程化落地,熟练掌握 CI/CD、容器化和自动化运维技术
  • 数据质量治理,具备搭建全链路数据质量监控体系的经验

以上就是我的自我介绍。

项目深挖

你在美团负责用户和订单域的数仓重构,统一了 80 多个业务指标口径。请具体说明:当时指标口径混乱到了什么程度?举一个最典型的例子;你是如何推动统一指标口径的?具体做了哪些工作;统一口径之后,给业务带来了什么具体的价值?

  1. 混乱程度: 当时就一个order_count,因为不同的计算逻辑,在不同事实表中有着超过10种的口径,导致当时下游使用报表字段的时候需要频繁确认,但依然避免不了使用错误导致的数据不一致问题。导致了我们不得不耗费大量精力去排查这些问题。
  2. 推动过程:
    • 首先,成立了一个数据指标治理小组,成员包括了数据团队、产品、运营和财务的核心人员
    • 然后,制定了统一的指标命名规范和计算逻辑规范,对于不同的统计逻辑,分别明确不同的指标定义,比如 “有效订单” 、 “支付订单”、“完成订单”等
    • 接着,在数仓层面下线了所有非标准的订单表,只在DWS层保留了一个统一的订单事实表,所有下游报表的开发必须基于这个基础表。
    • 最后,搭建了元数据管理平台,将所有指标的定义、计算逻辑、负责人都录入平台,并且建立了指标审批流程
  3. 具体价值:
    • 统一口径后,数据问题的工单下降了80%,为团队每个月节省了超过200小时的问题排查时间
    • 规范化口径后,再也没有出现因为口径不一致导致的业务纠纷
    • 下游开发可以更方便基于元数据管理平台获取所需指标,避免了找不到指标负责人的问题,并且最终数据的可信度也大大提升

你通过数据倾斜治理、分区裁剪和 Spark SQL 调优,将核心订单报表的产出时间从 T+2 小时缩短至 T+45 分钟。请具体说明:这个核心订单报表的计算逻辑是什么?它依赖了哪些表?总数据量有多大;你是如何定位到性能瓶颈的?具体用了哪些工具和方法;请详细说明你做的每一项优化,以及每一项优化分别带来了多少性能提升;优化过程中遇到的最大的技术挑战是什么?你是如何解决的?

  1. 报表详情:这个核心报表是全平台活动效果分析表,计算逻辑是统计每个活动在不同用户标签、不同地区、不同时间段的曝光量、点击量、下单量、支付量及转化率。它依赖了5张核心表:活动维表、用户标签维表(拉链表)、曝光日志表、点击日志表、订单事实表,总数据量约每日3TB。
  2. 瓶颈定位方法:
    • 通过Spark Web UI 的 DAG 图,发现整个作业有3个shuffle阶段,其中第二个shuffle阶段的运行时间占了总时间的70%
    • 然后查看 Task 的执行时间,发现有 10 个 Task 的运行时间超过了 1 小时,而其他 Task 只需要 10 分钟左右,确认存在严重的数据倾斜
    • 最后通过 spark.eventLog.enabled 生成的事件日志,统计出倾斜最严重的 10 个活动 ID,它们的订单量占了总订单量的30%
  3. 优化措施及效果
    • 前置过滤优化: 在SQL最外层过滤掉了不参与活动的订单和无效的曝光日志,减少了 60% 的输入数据量,运行时间缩短了30分钟
    • 分桶优化:将订单事实表和点击日志表按 活动ID 进行分桶,分桶数设置为2000,避免了shuffle阶段的数据重分区,运行时间缩短了20分钟
    • MapJoin优化:将活动维表和用户标签维表广播到所有 executor,在 map 端进行 join,消除了两个shuffle阶段,运行时间缩短了15分钟
    • 数据倾斜治理:将倾斜最严重的10个 活动ID 单独拆分出来,使用加盐法进行处理,运行时间缩短了10分钟
    • 资源调优:将 executor 的数量从 100 增加到 200, 每个 executor 的 core 数从 2 增加到 4,运行时间缩短了5分钟
  4. 最大技术挑战:最大的挑战是用户标签维表太大,无法直接进行MapJoin。用户标签维表有2亿条数据

用户标签维表是怎么设计的,结构是什么样的,为什么这么设计,如何更新

那你在做美团数仓时,Spark 任务数据倾斜是非常高频的问题,你当时遇到过哪些典型的倾斜场景?分别是怎么解决的?给我讲两个最典型的,要结合外卖业务场景。

在美团外卖这个项目里,你负责了活动、订单、骑手三大主题域,那在 DWD 层做维度建模时,你是怎么区分事实表和维度表的?

针对骑手这种会频繁变化的维度信息,你具体是怎么设计拉链表的?用的是全量拉链还是增量拉链?

问题 3-1:你在微软负责 广告日志处理,原来的日志处理任务是怎么设计的?存在哪些具体的性能问题?

问题 3-2:为什么选择 ZSTD 压缩而不是 Snappy 或 Gzip?它带来了多少压缩比提升和性能提升?

最终选择 ZSTD 压缩,是经过了严格的性能测试和对比的,三种压缩算法的对比如下:

  • Snappy:压缩速度最快,但压缩比最低(约2:1),适合对速度要求极高,但对存储不敏感的场景
  • Gzip:压缩比最高(约4.5:1),但解压速度最慢,CPU开销大,适合冷数据存储
  • ZSTD:压缩比与Gzip相当(约4:1),但压缩速度是Gzip的3-5倍,CPU开销低,同时支持分块压缩和随机访问,非常适合热数据处理

除了技术优势外,微软的Cosmos平台对ZSTD有原生的优化支持,能够进一步提升性能

  • 压缩比从原来的 2:1 提升到了 4:1,存储成本直接降了50%
  • 由于数据量减少了一半,网络传输和磁盘IO时间也相对减少,任务整体运行时间缩短了15%
  • 同时,ZSTD的分块压缩特性使得我们可以直接读取数据的一部分,而不需要解压整个文件,大大提升了数据查询效率

问题 3-3:细粒度分区是怎么设计的?原来的分区粒度是什么?为什么要改成现在的粒度?

问题 3-4:你提到的 Spark 参数调优中,哪一个参数的调整带来了最大的性能提升?为什么?请具体说明这个参数的作用,以及你是如何确定调整到什么值的。

在所有Spark参数调优中,spark.shuffle.file.buffer 这个参数的调整带来了最大的性能提升,约占总提升的25%。

参数作用:这个参数控制的是shuffle写过程中每个map任务的输出缓冲区大小,默认是32KB。当缓冲区满了之后,才会将数据溢写到磁盘上

为什么影响最大:我们的日志处理任务是典型的IO密集型任务,shuffle过程中会产生大量的磁盘IO。默认的32KB缓冲区太小,导致每个map任务会频繁地溢写磁盘,产生大量的小文件,严重影响了性能。

如何确定调整值:

  1. 首先通过Spark Web UI 查看shuffle阶段的磁盘IO统计,发现每个map任务平均溢写了10次以上
  2. 然后进行了梯度测试,分别将参数调整为64KB、128KB、256KB、512KB
  3. 测试结果显示,当调整到128KB时,溢写次数减少到了2次,性能提升最明显,继续增大到256KB时,性能提升不明显,反而增加了内存开销
  4. 最终这个参数设置为128KB

实际效果:shuffle阶段的磁盘IO减少了80%,任务整体运行时间缩短了15%

核心技术深度

问题1: 请详细说明 Spark 3.x 的统一内存管理机制。什么是存储内存和执行内存?它们之间是如何动态调整的?如果一个Spark 任务出现 executor OOM,你会按照什么步骤进行排查?

Spark 3.x 采用的是统一内存管理机制,将堆内存划分为四个部分:

  1. 预留内存: 300MB,用于Spark内部使用,用户无法配置

  2. 用户内存:占总内存的40%,

    1. 正确计算方式:

      可用内存 = 总内存 - 预留内存 (300MB)

      用户内存 = 可用内存 × (1 - spark.memory.fraction)

      默认spark.memory.fraction=0.6,所以用户内存默认是可用内存的 40%

      用途:用户自定义数据结构、RDD 依赖、Spark 内部元数据和防止 OOM 的安全缓冲

  3. 统一内存:占总内存的60%

    ​ 正确计算方式:

    ​ 统一内存 = 可用内存 × spark.memory.fraction

    ​ 默认是可用内存的 60%

    ​ 两者确实可以动态占用对方的空闲内存,但有严格的优先级和限制。

  4. 没有单独的 “其他内存” 区域。线程栈、JVM 元数据等都包含在用户内存中,是用户内存的一部分

存储内存和执行内存的动态调整规则

  • 初始时,存储内存和执行内存各占统一内存的50%
  • 当任何一方有空闲内存时,另一方都可以抢占
  • 但当被抢占的一方内存不足时,还回内存的逻辑是不同的
    • 对于执行内存不足时,可以直接驱逐存储内存超过它本身配置的比例的部分(即,默认情况下超过50%的部分),这部分被驱逐的内存,Spark会释放缓存中最近最少使用(LRU)的数据块,将其溢写到磁盘
    • 对于存储内存不足时,只能等待执行内存释放,不能驱逐被执行内存占用的部分

Executor OOM排查步骤:

  1. 第一步:确定OOM类型
    • 查看任务日志,确定是java.lang.OutOfMemoryError: Java heap space还是java.lang.OutOfMemoryError: Direct buffer memory
    • 区分是driver OOM 还是 executor OOM
  2. 第二步: 查看Spark Web UI
    • 查看 Executors页面,观察每个executor内存的使用情况和GC时间
    • 查看Storage页面,观察缓存的数据量和大小
    • 查看Stages页面,观察每个stage的shuffle数据量和数据倾斜情况
  3. 第三步:分析常见原因
    • 数据倾斜:某个executor处理的数据量远大于其他executor,这是最常见的原因
    • shuffle数据量过大:shuffle 阶段产生的数据量超过了executor的内存
    • 大对象:代码中创建了过大的对象,比如一次性collect()大量数据到driver
    • 内存配置不合理:executor内存太小,或者存储内存占比太高
  4. 第四步:针对性解决
    • 数据倾斜:使用加盐法、拆分倾斜Key等方法
    • shuffle优化:调整shuffle参数,使用ZSTD压缩
    • 内存配置:增加executor内存,调整spark.memory.fraction参数
    • 代码优化:避免在driver端处理大量数据,使用广播变量代替大表join

问题2: Flink的CheckPoint机制是如何实现Exactly-Once语义的?什么是两阶段提交(2PC)?Flink的两阶段提交Sink是怎么工作的?

Flink的Exactly-Once语义氛围内部Exactly-Once和端到端Exactly-Once:

  • 内部Exactly-Once通过Checkpoint机制实现
  • 端到端Exactly-Once需要结合Checkpoint机制和两阶段提交(2PC)Sink实现

Checkpoint实现内部Exactly-Once的原理

Checkpoint的核心是barrier对齐和状态持久化:

  1. JobManager会定期向所有Source算子发送Checkpoint barrier
  2. 当 Source 算子收到 barrier 后,会暂停数据处理,将自己的状态持久化到状态后端,然后barrier发送给下游算子
  3. 当一个算子收到所有上游算子的barrier后,会进行barrier对齐,然后将自己的状态持久化到状态后端,再将barrier发送给下游
  4. 当所有算子都完成了状态持久化后,JobManager会标记这个checkpoint为成功
  5. 如果作业失败,Flink会从最近一次成功的Checkpoint恢复状态,重新处理数据,保证数据不会丢失也不会重复

问题:

  1. 如果在barrier对齐阶段,上游某一个算子保存checkpoint-1失败了,checkpoint-2发送过来了,在下游算子,checkpoint-2对齐了,具体的处理是怎样的
  2. 对于在barrier对齐时导致的数据反压,flink是怎么处理的
  3. 什么叫端到端的exectly-once

两阶段提交

两阶段提交是一种分布式事务协议,用于保证分布式系统中多个节点的数据一致性。它分为两个阶段:

  • 预提交阶段:协调者向所有参与者发送预提交请求,参与者执行事务操作,并将结果返回给协调者
  • 正式提交阶段:如果所有参与者都预提交成功,协调者向所有参与者发送正式提交请求,参与者提交事务;如果任何一个参与者预提交失败,协调者向所有参与者发送回滚请求,参与者回滚事务。(预提交阶段,事务就已经完成了99%,只差让修改可见,所以正式提交,仅仅是让修改可见,如果在这个阶段失败的话,参考xxx)

Flink的两阶段提交Sink工作流程

Flink的TwoPhaseCommitSinkFunction 实现了两阶段提交,它将Checkpoint和事务提交结合起来:

  1. 预提交阶段:当算子收到Checkpoint Barrier后,会开启一个事务,将当前批次的数据写入外部系统,但不提交事务。然后将事务ID保存到状态中,进行Checkpoint。(注意:预提交输出的数据,是当前这批数据需要Sink输出的处理结果)
  2. 正式提交阶段:当JobManager确认所有算子都完成了Checkpoint后,会向所有算子发送Checkpoint完成的通知。算子收到通知后,提交之前预提交的事务。
  3. 回滚阶段:如果checkpoint失败,算子会回滚之前预提交的事务

通过这种方式,Flink保证了只有当Checkpoint成功时,数据才会被提交到外部系统,从而实现了端到端的Exactly-Once语义

问题:

  1. Flink两阶段提交的具体实现是在哪里
  2. Flink的回滚,是只删除外部系统中之前写入的checkpoint,但是正常的处理过程不会受影响,是吗,然后会在下一个barrier重新开启一个事务是吗?
  3. 状态中保存事务ID的作用是什么
  4. 如果失败回滚了,本来可能是1分钟统计一次结果的,按照分钟作为目录输出,会不会导致失败的那分钟,没有文件,都写到下一分钟去了?

问题3: 在Hive和Spark SQL中,数据倾斜的根本原因是什么?请分别说明group by倾斜和join倾斜的解决办法,并说明每种方法的适用场景。

数据倾斜的根本原因是shuffle阶段相同key被分配到同一个reducer,而有些key的数据量要远远大于其他key,导致部分reducer处理数据量过大,而执行时间长,甚至失败

group by倾斜的解决方法和适用场景

解决方法 原理 适用场景
Map 端预聚合 在 map 端先进行一次局部聚合,减少 shuffle 到 reducer 端的数据量 大多数 group by 场景,尤其是聚合函数是 sum、count 等可累加的情况
加盐法 给倾斜的 key 加上随机前缀,分散到多个 reducer 进行局部聚合,然后去掉前缀再进行全局聚合 少数几个 key 数据量特别大的场景
过滤无效 key 提前过滤掉 null 值、空字符串等无效 key 倾斜是由大量无效 key 导致的场景

join倾斜的解决方法及适用场景

解决方法 原理 适用场景
MapJoin 将小表广播到所有 executor,在 map 端进行 join,避免 shuffle 其中一个表是小表(小于 1GB)的场景
拆分热点 key 将倾斜的 key 单独拿出来处理,然后和其他 key 的结果合并 热点 key 数量很少(少于 10 个)的场景
加盐法 给两个表的倾斜 key 都加上随机前缀,分散到多个 reducer 进行 join 两个都是大表,且热点 key 数量较多的场景
动态分区法 先统计出倾斜的 key,然后将这些 key 单独分到一个分区,其他 key 分到其他分区,分别进行 join 热点 key 数量较多,但可以提前统计出来的场景
BloomFilter 过滤 用 BloomFilter 过滤掉其中一个表中不存在的 key,减少 join 的数据量 其中一个表的 key 数量远少于另一个表的场景

在Hive和Spark SQL中,这些方法的实现方式略有不同,但原理是一样的。比如Spark SQL可以通过broadcast()函数实现MapJoin,而Hive可以通过设置hive.auto.convert.join=true参数自动开启MapJoin

问题:

  1. 如何在程序中自动识别热点key,而不用每次出现问题单独处理,处理方式可能会遇到什么问题?
  2. mapJoin还有哪些配置和需要注意的地方,有没有什么坑

细粒度分区策略是怎么降低存储的

场景设计题

如果让你设计一个电商平台的订单实时数仓,要求支持秒级的订单指标查询,并且保证数据的一致性和准确性。请说明你的架构设计,包括数据流向、分层设计、技术选型和关键技术点

我会采用Kappa架构来设计这个电商平台的订单实时数仓,因为Kappa架构更加简单,易于维护,能够很好地满足秒级查询和数据一致性的要求

整体数据流向

1
业务数据库(MySQL) -> Canal -> Kafka(ODS层) -> Flink -> Kafka(DWD层) -> Flink -> Doris(DWS/ADS层) -> 业务应用

分层设计

  • ODS层:原始数据层,存储从Canal同步过来的订单binlog数据,保留原始的字段和格式。数据按天+小时分区存储在Kafka中,保留7天
  • DWD层:数据明细层,对ODS层的数据进行清洗、过滤、脱敏和格式转换,生成统一的订单明细事实表。采用星型模型,以订单事实表为中心、关联用户、商品、商家等维度表
  • DWS层:数据汇总层,按照不同的维度(用户、商品、商家、时间等)对DWD层的数据进行预聚合,生成各种宽表。比如订单按天汇总表、订单按商家汇总表等
  • ADS层:数据应用层,根据业务需求,从DWS层的数据生成最终的报表和指标。比如实时订单量、实时交易额、实时转化率等

技术选型

  • 数据采集:Canal,用于实时同步MySQL的binlog数据
  • 消息队列:Kafka,用于解耦数据采集和数据处理,提高吞吐量和高可靠性
  • 实时计算:Flink,用于数据清洗、转换和聚合,提供Exactly-Once语义保证
  • 数据存储:Doris,用于存储DWS层和ADS层的数据,提供秒级的查询性能和事务支持
  • 数据可视化:Grafana,用于展示实时报表和指标,还有报警监控

关键技术点

  • 数据一致性保证:
    1. 使用Flink的Exactly-Once语义,保证数据在Flink内部不会重复也不会丢失
    2. 在Doris端使用幂等写入和事务支持,保证数据写入的一致性
    3. 对于订单状态变更的情况,使用Doris的upsert操作,保证最新的状态覆盖旧的状态
  • 性能优化:
    1. 对于Doris的表进行合理的分区和分桶,提高查询性能
    2. 在Flink端进行预聚合,减少写入Doris的数据量
    3. 使用Drios的物化视图,提前计算好常用的指标
  • 容错机制
    1. Kafka开启3副本机制,保证数据不丢
    2. Flink开启Checkpoint,每30秒保存一次作业状态
    3. Doris开启3副本机制,保证数据的高可用
  • 监控告警:
    1. 监控Kafka的消息堆积量和消费延迟
    2. 监控Flink作业的运行状态和Checkpoint成功率
    3. 监控Doris的查询延迟和写入延迟

问题

  1. 有哪些常用的架构,优缺点,适用场景
  2. 现在mysql同步工具最好的是不是Flink?比较之下优点有哪些