生而为人

程序员的自我修养

0%

状态保存

状态保存的作用

将每次流计算的group结果,保存下来,用于下一次继续聚合。

具体执行内容是如何的?下次聚合会使用状态中的什么数据?

groupState.setTimeoutTimestamp()

  1. With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark().
  2. With this setting, data that is older than the watermark are filtered out.
  3. The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp.
  4. You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering).
  5. Guarantees provided by this timeout are as follows:
  6. Timeout will never be occur before watermark has exceeded the set timeout.
  7. Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.

timeout晚于watermark不生效,早于watermark会报错?到底应该如何设置?

问题

  1. flatmapgroupswithstate 什么时候会timeout
  2. state-store-maintenance-task

参数调整

1
2
3
4
5
6
val spark = SparkSession
.builder()
.appName(appName)
.config(sparkConf)
.getOrCreate()
spark.sessionState.conf.setConfString("spark.sql.streaming.minBatchesToRetain", "5")

这个参数放到sparkConf中,无效。

参考资料

  1. spark 的 structured streaming 状态保存
  2. GroupState — Group State in Arbitrary Stateful Streaming Aggregation
  3. Spark基于事件时间的“状态”流的深层分析 - withWatermark与mapGroupsWithState的关系
  4. State lifecycle management in Structured Streaming
  5. spark api doc
  6. state相关配置
  7. Spark Structured Streaming: 自维护(任意)状态流的“超时”(Timeout)问题
  8. StateStore in Apache Spark Structured Streaming
  9. spark 的 structured streaming 状态保存
  10. GroupStateImpl
  11. Spark 流计算状态管理进化史
  12. StateStoreRDD — RDD for Updating State (in StateStores Across Spark Cluster)
  13. Spark State Tools
  14. structured streaming的checkpoint文件无限增长
  15. State Storage in Spark Structured Streaming
  16. Extending data reprocessing period for arbitrary stateful processing applications
  17. StructuredStream StateStore机制