生而为人

程序员的自我修养

0%

待整理

  1. 本地自定义数据源,会启动多个线程,且不会停止,无法控制单线程输入

本地自定义数据源,运行起来不停止

日志记录

2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:53 - loop:2
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:54 - operationInfo:{"operationTimeStamp":"2021-01-27 16:58:23","pageId":"30001","sessionId":"abcd","unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:53 - loop:2
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:54 - operationInfo:{"operationTimeStamp":"2021-01-27 16:58:23","pageId":"30001","sessionId":"abcd","unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:53 - loop:2
2021-01-27 16:58:23 Source: operation_data_stream (1/1) ERROR LocalOperationSource:54 - operationInfo:{"operationTimeStamp":"2021-01-27 16:58:23","pageId":"30001","sessionId":"abcd","unionId":"123","userId":"user_123"}
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:47 - loop:2
2021-01-27 16:58:23 Source: order_data_stream (1/1) ERROR LocalOrderSource:48 - orderInfo:{"orderId":10001,"orderTimeStamp":"2021-01-27 16:58:23","price":39.8,"spuList":["spu1","spu2","spu3"],"unionId":"123","userId":"user_123"}

现象:

  1. 两个数据源,都不断输出loop:2的记录,不会执行到loop:1,如果删除掉 Thread.sleep() 可以执行到loop:1
  2. 不会停止,按理说只应该执行一次loop:2

原因:

  1. 估计是Thread.sleep()把线程挂起后,无法唤醒,所以执行不到loop:1
  2. 可以判断与数据源无关,是受后续的处理方式影响的,如果直接sink,不会有问题2

原理

Flink 原理与实现:理解 Flink 中的计算资源

优化

Flink 滑动窗口优化

超长滑动窗口优化及解决方案

数据源

flink系列-4、flink自定义source、sink

Flink 如何自定义Source

Flink-04-Flink自定义Source

Flink——自定义Source

5分钟Flink - 自定义Source源

join

Flink如何实现3个实时流同时join,leftjoin,rightjoin

Flink Operator之CoGroup、Join以及Connect

方法详解

richXXXFunction的open,clone方法执行

资源

在线训练地址

数据处理

反压

怎么理解flink的有状态

基础

初识 HBase

Hbase入门详解

一篇文章入门Hbase

原理

HBase原理(读写流程)

hbase为什么能够实现实时读写

hbase知识点及实时读写原理

Hbase原理详解

优化

hbase读写性能测试调优

优化hbase的查询提升读写速率优化案例及性能提升的几种方法

实时系统HBase读写优化–大量写入无障碍

hbase大规模数据写入的优化历程

HBase篇–HBase常用优化

hbase实时优化思路

深入学习

HBase 学习分享

面试

Hbase 基础面试题

⚙️ 原理简述

  • Flink 为许多常见 Scala 类型(如基本类型、集合、元组、case class 等)提供了内置的 TypeInformation 工厂。
  • createTypeInformation 结合 ClassTag 和这些工厂,在编译期生成正确的 TypeInformation,避免了手动构造的繁琐和错误。

⚠️ 注意事项

  • 必须导入:如果忘记导入 org.apache.flink.streaming.api.scala._,编译器会报错“无法找到隐式值”,提示需要 TypeInformation
  • 自定义类型:对于自定义的 case class 或类,只要其成员类型都有可用的 TypeInformationcreateTypeInformation 也能自动组合生成。
  • Table API 中也有类似机制,但通常通过导入 org.apache.flink.table.api.bridge.scala._ 来提供。

[toc]

保存算法

checkpoint保存算法有几种

Flink的Checkpoint机制是其容错和状态一致性的核心。根据触发机制、数据保存方式和处理逻辑的不同,Checkpoint的实现方式可以从几个不同的维度进行分类。

🔀 按屏障对齐方式分类

特性 对齐检查点 (Aligned Checkpoint) 非对齐检查点 (Unaligned Checkpoint)
核心机制 基于Chandy-Lamport分布式快照算法变体–,通过Barrier对齐保证快照一致性。 在Flink 1.11引入,1.13达到生产可用-2,优化反压场景下的Checkpoint性能-33
触发时机 当算子最后一个输入流的Barrier到达时触发--10 当算子第一个输入流的Barrier到达时就触发--10
阻塞情况 Barrier对齐期间会阻塞后续数据处理--10 无需阻塞数据处理,Barrier会越过正在处理的数据-10-33
状态大小 仅保存算子状态,状态大小相对较小-10 额外保存Barrier之后“正在传输中”的数据,状态大小可能膨胀,每个Task可能达到几GB-10
适用场景 状态较小、反压不严重的常规场景。 反压严重、需要保证Checkpoint快速完成的场景--10
Exactly-Once 严格保证。 同样保证Exactly-Once语义-10-11

从Flink 1.13开始,官方引入了超时对齐机制,即“对齐超时后自动切换为非对齐”的混合模式,以平衡两种方式的优缺点-。

📊 按状态备份方式分类

特性 全量检查点 (Full Checkpoint) 增量检查点 (Incremental Checkpoint)
核心机制 每次Checkpoint都完整备份所有状态数据-6 仅备份自上次Checkpoint以来的状态变化部分-5-21
存储开销 较大,每次备份完整状态。 显著降低存储开销,尤其适合大状态场景-6-21
Checkpoint时间 较长,与状态大小成正比。 大幅缩短Checkpoint时间(TB级作业可从3分钟降至30秒)-21
恢复时间 较快,直接加载完整快照。 相对较长,需要从基础快照+多个增量文件恢复-。
适用场景 状态较小、对恢复时间要求高的场景。 状态巨大(GB/TB级别)、希望减少Checkpoint开销的场景-6
支持状态后端 所有状态后端。 主要支持RocksDBStateBackend-21

⚡ 按执行阶段分类

特性 同步阶段 (Synchronous Phase) 异步阶段 (Asynchronous Phase)
执行内容 调用snapshotState()方法,进行状态快照的准备或浅拷贝-19 将同步阶段准备好的状态数据持久化到远程存储-18
对数据处理影响 会阻塞数据处理,是Checkpoint的停顿时间。 不阻塞数据处理,后台线程执行上传-2
优化方向 优化同步阶段的逻辑,减少状态复制开销。 通过增量Checkpoint等机制减少上传数据量,或通过通用增量Checkpoint架构解耦。

🚀 高级优化:通用增量检查点 (Generic Log-Based Incremental Checkpoint)

在Flink 1.15引入,1.16达到生产可用-1-18

这是一种更先进的增量Checkpoint架构,主要优化包括:

  • 核心机制:引入State Changelog机制,将状态更新的变化日志持续写入远程存储(如HDFS/S3),而Checkpoint仅需Flush这些日志-1-18
  • 异步优化:将状态持久化与Checkpoint过程完全解耦,避免大文件上传的不确定性-18
  • 主要优势:Checkpoint完成时间更稳定、快速,端到端延迟更低-1-18

💎 总结

Flink的Checkpoint算法可以根据不同维度进行分类:

  • 按屏障对齐方式:分为对齐非对齐两种,主要权衡反压场景下的Checkpoint完成速度与额外的存储开销。
  • 按状态备份方式:分为全量增量两种,增量方式极大地优化了大状态作业的Checkpoint性能。
  • 按执行阶段:分为同步异步两个阶段,将耗时的持久化操作异步化,减少对数据处理路径的影响。
  • 高级优化通用增量Checkpoint(1.15+)是面向未来的架构,通过解耦Checkpoint与状态存储,实现了更稳定高效的Checkpoint。

这些算法通常组合使用。例如,一个生产作业可能配置为:使用RocksDB状态后端、开启增量Checkpoint、并设置对齐超时自动切换为非对齐

对齐算法中 Barrier对齐期间会阻塞后续数据处理 是什么意思

简单来说,“Barrier对齐期间会阻塞后续数据处理” 指的是Flink为了保证数据一致性,在Exactly-Once模式下执行检查点时,会采取的一种“停下来等”的策略。这个策略确实会导致数据处理出现短暂的“暂停”,以换取状态快照的绝对精准。

⏸️ 阻塞场景:为什么必须“等”?

要理解阻塞,需要先了解Flink的Barrier对齐机制。当JobManager触发一次Checkpoint时,Source算子会向数据流中注入带有ID的Barrier(屏障)-11。对于只有一个输入流的算子,Barrier的处理相对简单,它会在收到Barrier时直接触发快照-23

阻塞主要发生在有多个输入流的算子(如CoProcessFunction, Join等)-23。因为不同的输入流数据到达速度可能不同,Barrier到达的时间也会有先后-。

为了保证快照能精准地反映Barrier到达前那一刻所有输入流的状态,算子必须执行“对齐”操作:

  1. 先到的先等:当算子从某个输入流(比如输入流A)收到了一个Barrier(编号为n)时,它立即停止处理该输入流Barrier之后的任何新数据-23
  2. 继续处理其他:算子会继续处理其他尚未收到Barrier的输入流(比如输入流B)中的数据-11
  3. 对齐完成:直到输入流B的Barrier(编号也是n)也到达后,所有输入流的Barrier才算“对齐”-11。这时,算子才会触发自己的状态快照,并向下游广播Barrier-23

这个过程中,从第一个Barrier到达,到最后一个Barrier到达的这段时间,就是“对齐时间”(Alignment Duration)-2-。在此期间,输入流A的处理被阻塞,输入流A后续的数据会被暂存在一个缓冲区里,等待对齐完成后才被继续处理-23-25。如果这个过程非常耗时,就说明作业可能正经历反压-2

🆚 两种语义:阻塞与不阻塞的区别

这种“阻塞”行为,是实现 Exactly-Once 的关键,它能确保所有输入流的状态在快照时是完全一致的。

如果为了追求低延迟而选择“不等待”,即一旦接收到任何一个Barrier就立即触发快照并继续处理所有数据,这就变成了 At-Least-Once 语义。这样虽然减少了阻塞,但可能在恢复时造成数据重复处理-7

💡 阻塞的影响:反压与性能

“阻塞”虽然保证了准确性,但也可能带来一些副作用:

  • 加剧反压:在数据倾斜或下游处理能力不足时,对齐过程中的等待会进一步减慢数据处理速度,导致Barrier流动缓慢,Checkpoint耗时变长-11
  • 增加内存开销:用于暂存阻塞数据的输入缓冲区会消耗内存。如果对齐时间过长,缓冲区积压的数据会增多,可能导致内存压力过大-23

🔧 解决方案:非对齐检查点 (Unaligned Checkpoint)

为了解决“对齐阻塞”在反压场景下带来的性能问题,Flink从1.11版本开始引入了 非对齐检查点 (Unaligned Checkpoint) -7

它的核心思路是 “不停下来等,而是把等待时正在传输的数据也保存起来”。当算子的某个输入流收到Barrier后,它不再阻塞该通道,而是立即开始做快照,并将所有输入和输出缓冲区中“正在飞行”的数据作为快照的一部分一起持久化-11这相当于把对齐的压力从运行时转移到了恢复时,有效避免了因对齐阻塞而加剧反压的问题-11

💎 总结

“Barrier对齐阻塞”是Flink为换取Exactly-Once语义而设计的机制,它通过在快照制作时暂停部分数据处理来确保状态的一致性。这个机制在反压时可能成为瓶颈,而非对齐检查点正是为了解决这一问题而生的。

书中exactly once语义一节中看到一句话,其中一个步骤是算子任务对状态做快照,书上说,当分界线传递到了内部的window()算子,它同样会对自己的状态进行快照保存,并写入远程的持久化存储中。状态后端不是有两种吗,其中一种是写入内存的,为什么这里直接说写入持久化存储了?

这个观察很敏锐。书中的描述其实是从实现原理的角度出发的。对于Flink来说,只要开启了Checkpoint,即使使用看起来只存在内存中的MemoryStateBackend,状态也会以文件形式被持久化到远程存储。这其实是Flink实现Exactly-Once语义的基石。

MemoryStateBackend这个名称确实容易引起误解,它指的主要是运行时状态在内存中,但Checkpoint数据依然需要可靠的持久化。


🧐 一个名字引发的误解:MemoryStateBackend

根据Flink官方文档,MemoryStateBackend的核心行为是:

  • 运行时状态:保存在TaskManager的JVM堆内存中-1
  • Checkpoint时:状态数据被发送到JobManager,并为了高可用性(HA)被持久化到文件系统中-1。它本质上是一个可以脱离文件系统工作的“基于文件系统的后端”-1

这种设计在Flink 1.12的官方文档里说得更直接:MemoryStateBackend在Checkpoint时,“将快照信息作为CheckPoint应答消息的一部分发送给JobManager(master),同时JobManager也将快照信息存储在堆内存中”-5。这里并没有提及直接持久化到远程存储,所以书里的说法,很可能是基于更高版本中对其HA机制的描述,或是从“实现Exactly-Once语义”这个更高阶的视角出发,将Checkpoint视为最终必须写入持久化存储的行为。

💡 概念重塑:状态后端与检查点存储的“解耦”

Flink 1.15及之后版本的重要演进,可以帮助我们更好地理解这种设计:Flink将状态后端(State Backend)和检查点存储(Checkpoint Storage)这两个概念彻底分开了-3。这意味着,你完全可以组合使用不同的后端和存储方式:

  • HashMapStateBackend (等价于MemoryStateBackend) 处理运行时状态。
  • JobManagerCheckpointStorage 负责将Checkpoint数据存储在JobManager内存中,适用于本地测试-3

但为了实现生产级的容错(Exactly-Once语义),通常会将HashMapStateBackendFileSystemCheckpointStorage搭配使用,这样状态快照就会被可靠地写入HDFS或S3等分布式文件系统-3


⚙️ 三种官方状态后端如何工作

下图清晰地展示了Flink官方提供的三种状态后端,在工作状态存储和Checkpoint持久化上的分工:

状态后端 工作状态存储 (TaskManager 本地) Checkpoint 持久化存储 (远程)
MemoryStateBackend (旧版,已不推荐) TaskManager JVM 堆内存-2 JobManager 内存 (HA模式下可持久化到文件系统)-1
FsStateBackend (旧版,已不推荐) TaskManager 内存-2 远程文件系统 (如 HDFS, S3)-2
RocksDBStateBackend (推荐) 本地 RocksDB 实例 (内存+磁盘)-2 远程文件系统-2

💎 总结

所以,书上的那句话并没有错。它强调的是Exactly-Once语义实现的关键一环——将全局一致的状态快照进行持久化存储,而具体存到哪里,则由所选的状态后端和检查点存储配置共同决定。MemoryStateBackend虽然运行时状态在内存中,但为了实现生产级的容错,它的Checkpoint数据最终也需要被写入可靠的存储中。

也就是说,当开启了checkpoint,状态可能会存两份,一份在内存,一份要持久化?

是的,你的理解非常准确。开启 Checkpoint 后,Flink 的状态确实会存在“两份”:一份是用于处理实时数据的工作状态(Working State),另一份是用于故障恢复的快照状态(Snapshot State)

这“两份”数据虽然内容上有关联,但它们的存储位置、生命周期和作用是完全不同的。

两份状态的详细对比

特性 工作状态 (Working State) 快照状态 (Snapshot State / Checkpoint)
作用 用于处理实时数据,例如存储窗口中的元素、键控状态的值。 用于故障恢复,是一个全局一致的状态副本
存储位置 存储在 TaskManager 本地。根据状态后端不同,可能在内存或RocksDB。 存储在 远程持久化存储(如 HDFS, S3)。
生命周期 随作业持续运行,不断被读写和更新。 周期性生成,且是只读的,生成后不再改变。
数量 一份(每个算子实例一个)。 可能有多份(由 num-retained 控制)。
是否必须 是,数据处理的基础。 是,容错机制的基础。

一个类比:文档编辑的“自动保存”

你可以把它们想象成正在编辑一份文档:

  • 工作状态 就是你屏幕上正在编辑的文档内容,你所有的修改都在这里实时发生。它就相当于保存在你电脑内存/缓存里的“工作状态”。
  • 快照状态 就是你每隔一段时间点击“保存”后,存到硬盘上的那个文件。那个文件是过去某个时间点的静态副本,主要用于系统崩溃后恢复。

而开启 Checkpoint 后的过程就像这样:

  1. 你的文档(工作状态)一直在内存中,你可以随时编辑(处理数据)。
  2. 每隔几分钟,系统自动帮你把当前文档内容完整地存一次盘(生成快照)。
  3. 你的电脑内存里有一份正在编辑的文档(工作状态),硬盘上则可能保留着最近几次存盘的副本(快照状态)。

两点补充说明

  1. “两份”不等于“双倍”开销
    • 虽然数据是两份,但生成快照的过程(特别是异步和增量快照)已经高度优化,通常不会让数据处理的延迟翻倍。
    • 增量快照每次只保存自上次快照以来的变化部分,而不是全部数据,这大大减少了快照的数据量和网络传输。
  2. 不是所有状态后端都“在内存”
    • 如果使用 RocksDBStateBackend,工作状态本身主要存储在本地磁盘(RocksDB),而不仅仅是内存。这时,“两份”的含义就变成了:一份在本地磁盘(工作状态),另一份在远程存储(快照)

总结

所以,你的理解完全正确:Checkpoint 机制本质上就是用远程存储的静态快照,为本地运行的动态工作状态提供了一层可靠的容错保障。这是一套非常经典的“动态数据 + 静态备份”的分布式系统设计模式。

[toc]

Map[String, () => Solution]

以函数作为map值存储的

在 Scala 中,Map[String, () => Solution] 里的 () => Solution 是一个函数类型,它表示一个不接受任何参数、返回一个 Solution 对象的函数。将这样的函数作为 Map 的值存储,体现了 函数作为一等公民(first-class citizen) 的特性,即函数可以像普通值一样被传递、存储和调用。

具体来说,这里的 () => Solution 是一个函数值(function value),它是函数类型的实例。你可以把它看作是一个工厂:当你需要一个新的 Solution 实例时,就调用这个函数(通过 apply() 或直接加括号 ()),它会生成一个实例。这种存储函数而不是直接存储实例的做法,本质上是在存储一个生产实例的行为,而不是实例本身。

这种设计模式常被称为工厂函数存储延迟初始化,它是函数式编程中常见的技巧,利用了函数作为值的灵活性。

函数的返回值

根据返回值数量选择合适的数据类型

固定数量、相同类型 → 元组 / case class

可变数量、相同类型 → 集合(List、Seq、Vector 等)

可变数量、不同类型 → 用 sealed trait 和集合模拟,或使用 HList(shapeless)

两种可能的结果 → Either、Try、Option

当返回值可能成功也可能失败时:使用 Either 或 Try

如果你的函数可能返回多种结果(例如成功时返回一个值,失败时返回错误信息),可以使用 Either

1
2
3
def divide(a: Int, b: Int): Either[String, Int] = 
if (b == 0) Left("Division by zero")
else Right(a / b)

Either 通常用于表示两种可能的结果,但你可以嵌套组合来表示更多情况。

富函数

为什么可以通过富函数类来自定义按键分区状态

在 Flink 中,富函数(RichFunction) 能够自定义按键分区状态(Keyed State),是因为 Flink 的设计将 “运行时上下文(RuntimeContext)”“状态后端(State Backend)” 无缝集成到了富函数的生命周期中。

简单来说:富函数充当了用户逻辑与 Flink 分布式状态管理机制之间的桥梁。下面从几个关键点解释其原理。


1. 富函数提供 RuntimeContext,这是状态管理的入口

当你在 KeyedStream 上使用富函数(如 RichFlatMapFunctionKeyedProcessFunction)时,Flink 会在算子初始化时通过 open() 方法将 RuntimeContext 注入到函数实例中。

RuntimeContext 提供了以下关键方法,用于创建和管理按键分区状态:

  • getState(ValueStateDescriptor)
  • getListState(ListStateDescriptor)
  • getMapState(MapStateDescriptor)
  • getReducingState(ReducingStateDescriptor)
  • getAggregatingState(AggregatingStateDescriptor)

这些方法返回的状态对象(如 ValueState)会自动绑定到当前处理的数据的 key 上。


2. 按键分区状态的自动 key 感知

按键分区状态的核心特点是:每个不同的 key 都有自己独立的状态副本。富函数在 KeyedStream 上使用时,Flink 的运行时框架会自动将当前处理的元素的 key 与状态对象关联

具体流程:

  • 当数据进入算子时,Flink 知道这条数据属于哪个 key(通过 KeySelector 或分区方式确定)。
  • 在调用 processElement() 等方法时,RuntimeContext 内部会根据当前 key 找到对应的状态分区。
  • 用户代码通过状态对象读写时,实际上操作的是当前 key 对应的状态,无需手动管理 key 到状态的映射。

这种机制被称为 “按键分区状态的自动作用域(scoping)”


3. 状态后端负责实际存储和容错

用户通过富函数定义的状态描述符(StateDescriptor)只声明了状态的名称、类型、序列化器以及可选的TTL(Time-To-Live)。真正的状态数据由 状态后端(State Backend) 管理:

  • **MemoryStateBackend**:状态存储在 JVM 堆内存中,适合本地测试。
  • **FsStateBackend**:状态存储在内存中,但检查点会持久化到文件系统。
  • **RocksDBStateBackend**:状态存储在本地 RocksDB 数据库中,支持超大状态。

无论使用哪种状态后端,富函数中的状态操作都通过统一的 State 接口访问,Flink 会在后台处理序列化、缓存、磁盘读写、检查点快照等复杂逻辑。


4. 容错与一致性保证

富函数中的按键分区状态会自动参与检查点(Checkpoint) 机制。当作业失败恢复时,Flink 会从最近成功的检查点中恢复所有 key 的状态,并将其重新加载到对应的算子实例中。这一过程对用户代码完全透明,只需在富函数中定义状态即可。


5. 设计优势:为什么不用普通函数?

如果不使用富函数,而是自己维护一个 Map<Key, Value> 来实现类似功能,会遇到以下问题:

  • 内存溢出:状态无法自动溢写到磁盘。
  • 故障恢复困难:需要手动实现状态的快照和恢复。
  • 无法感知 key 的变化:需要手动从数据中提取 key 并维护映射。
  • 性能差:缺少 Flink 内部的优化(如状态缓、增量快照、异步检查点等)。

富函数 + 按键分区状态的设计,将分布式状态管理的复杂性完全封装,让用户能像写单机程序一样使用状态,同时享受 Flink 提供的高性能、一致性和容错能力。


💎 总结

组件 作用
富函数 提供生命周期方法(openclose)和运行时上下文(RuntimeContext)。
RuntimeContext 状态创建入口,自动绑定当前 key。
StateDescriptor 声明状态的名称、类型、序列化器、TTL 等元信息。
状态后端 实际存储状态数据,支持内存、文件系统、RocksDB,并负责检查点容错。

因此,通过富函数自定义按键分区状态,本质上是 Flink 将分布式状态管理抽象为易用的编程模型,让开发者无需关心底层的 key 路由、状态分区、容错快照等细节。

富函数只能在KeyedStream上使用吗

不是的,富函数(RichFunction)并不仅限于 KeyedStream,它可以在任何数据流上使用

几乎所有 Flink 算子(如 map, filter, flatMap 等)都同时提供了普通函数和对应的富函数版本,例如 RichMapFunction, RichFilterFunction, RichFlatMapFunction-20-8。即使是在非按键分区的普通数据流(DataStream)上,你依然可以无缝地使用这些富函数-。


🧩 富函数在非KeyedStream上的主要应用

虽然不能使用键控状态(Keyed State),但在非KeyedStream上,富函数依然提供了两个非常重要的能力:

  • **生命周期管理 (Lifecycle Management)**:RichFunction 提供了 open()close() 两个生命周期方法-36
    • open()**:在算子的实际工作方法(如 map())被调用之前执行一次,常用于一次性初始化**任务,如建立数据库连接、加载配置文件等-20
    • close()**:在算子生命周期结束时调用,用于执行清理工作**,如关闭连接、释放资源等-20
  • 获取运行时上下文 (Access to RuntimeContext)**:通过 getRuntimeContext() 方法,可以获取算子的运行时信息,例如算子并行度、任务名称**等-36。这为开发通用、自适应的函数提供了基础。

🚫 在非KeyedStream上的能力限制

在非 KeyedStream 上使用富函数时,确实无法访问以下两项关键功能:

  • **无法访问键控状态 (Keyed State)**:键控状态是基于 Key 进行分区的,由于 DataStream 没有经过 keyBy() 分区,因此无法获取和使用 ValueState, ListState 等键控状态--5
  • **无法访问定时器服务 (TimerService)**:TimerService(如 registerEventTimeTimer())依赖于为特定 Key 注册定时器的能力,因此在非 KeyedStream 上不可用--1

💎 总结

  • 能否使用? 可以,富函数可以用在任何数据流上。
  • 与 KeyedStream 的区别?KeyedStream无法使用键控状态和定时器服务-2,但可以正常使用生命周期方法和运行时上下文。