生而为人

程序员的自我修养

0%

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#operations-on-streaming-dataframesdatasets

https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html

The Internals of Spark Structured Streaming (Apache Spark 2.4.4)

已读

Spark——Structured Streaming

  1. 整体介绍
  2. watermark工作原理
    1. watermark工作时,需要设置withwatermark与groupby使用相同time field

未读

一、简介

专访朱诗雄:Apache Spark 中的全新流式引擎 Structured Streaming

看了这篇博客,你还敢说不会Structured Streaming?

Note_Spark_Day13:Structured Streaming

Spark Structured Streaming笔记

Spark教程:Spark Structured Streaming入门编程指南

Spark从入门到精通(09):结构化流(Structured Streaming)(上)

Spark Structured Streaming高级特性

Spark Structured Streaming

二、架构

三、watermark

四、state

五、streaming+kafka

重启任务如何保证紧接上一次消费

官网:Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

Spark streaming消费Kafka的正确姿势

Tutorial: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight

Processing Data in Apache Kafka with Structured Streaming in Apache Spark 2.2

How to write spark streaming DF to Kafka topic

Apache Kafka transactional writer with foreach sink, is it possible?

spark向kafka写入数据

Spark : Best way to Broadcast KafkaProducer to Spark streaming

Spark Dataframe to Kafka

How to write a Dataset to Kafka topic?

六、文件

Spark2.0入门:Structured Streaming操作文件流

Spark之Spark Streaming处理文件流数据

Spark2.0入门:Structured Streaming操作文件流

spark流式读取hdfs中数据

七、config

Configuration Properties

sqlconf.scala

八、stage

InMemoryTableScanExec Leaf Physical Operator

九、源码

Spark Structrued Streaming源码分析–(三)Aggreation聚合状态存储与更新

十、监控

ProgressReporter Contract

StreamingQueryListener — Intercepting Life Cycle Events of Streaming Queries

如何使用 Spark 3.0 中新加的 Structured Streaming UI 来进行异常分析

二十、未分类

MicroBatchExecution — Stream Execution Engine of Micro-Batch Stream Processing

spark运行没有启动job

Spark Structured Streaming app has no jobs and no stages

问题:

  1. spark broadcast kafkaproducer
  2. dataframe write to kafka each
  3. structured streaming 流式读取文件 多层
  4. structured streaming finished without job

Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode

Structured Streaming Programming Guide

这个状态执行几分钟

1
21/09/08 07:09:55 INFO StateStore [state-store-maintenance-task]: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@2f640a0b

Streaming job gets stuck writing to checkpoint

Spark stateful streaming processing is stuck in StateStoreSave stage!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
User class threw exception: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
metricValueProvider (org.apache.kafka.common.metrics.KafkaMetric)
metrics (org.apache.kafka.common.metrics.Metrics)
metrics (org.apache.kafka.clients.producer.internals.BufferPool)
free (org.apache.kafka.clients.producer.internals.RecordAccumulator)
accumulator (org.apache.kafka.clients.producer.KafkaProducer)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:245)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:292)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1493)
at com.microsoft.sam.HistoryDataUploader.run(HistoryDataUploader.scala:121)
at com.microsoft.sam.HistoryDataUploader$.main(HistoryDataUploader.scala:207)
at com.microsoft.sam.HistoryDataUploader.main(HistoryDataUploader.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.RuntimeException: Could not serialize lambda
at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:69)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
... 35 more
Caused by: java.lang.NoSuchMethodException: org.apache.kafka.common.network.Selector$SelectorMetrics$$Lambda$38/747274186.writeReplace()
at java.lang.Class.getDeclaredMethod(Class.java:2130)
at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:60)

spark3.0运行,延迟很长时间才调用 onQueryProgress,2.4上没有遇到该问题

Spark Structured Streaming StreamingQueryListener.onQueryProgress not called per microbatch?

状态保存的作用

将每次流计算的group结果,保存下来,用于下一次继续聚合。

具体执行内容是如何的?下次聚合会使用状态中的什么数据?

groupState.setTimeoutTimestamp()

  1. With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark().
  2. With this setting, data that is older than the watermark are filtered out.
  3. The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp.
  4. You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering).
  5. Guarantees provided by this timeout are as follows:
  6. Timeout will never be occur before watermark has exceeded the set timeout.
  7. Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.

timeout晚于watermark不生效,早于watermark会报错?到底应该如何设置?

问题

  1. flatmapgroupswithstate 什么时候会timeout
  2. state-store-maintenance-task

参数调整

1
2
3
4
5
6
val spark = SparkSession
.builder()
.appName(appName)
.config(sparkConf)
.getOrCreate()
spark.sessionState.conf.setConfString("spark.sql.streaming.minBatchesToRetain", "5")

这个参数放到sparkConf中,无效。

参考资料

  1. spark 的 structured streaming 状态保存
  2. GroupState — Group State in Arbitrary Stateful Streaming Aggregation
  3. Spark基于事件时间的“状态”流的深层分析 - withWatermark与mapGroupsWithState的关系
  4. State lifecycle management in Structured Streaming
  5. spark api doc
  6. state相关配置
  7. Spark Structured Streaming: 自维护(任意)状态流的“超时”(Timeout)问题
  8. StateStore in Apache Spark Structured Streaming
  9. spark 的 structured streaming 状态保存
  10. GroupStateImpl
  11. Spark 流计算状态管理进化史
  12. StateStoreRDD — RDD for Updating State (in StateStores Across Spark Cluster)
  13. Spark State Tools
  14. structured streaming的checkpoint文件无限增长
  15. State Storage in Spark Structured Streaming
  16. Extending data reprocessing period for arbitrary stateful processing applications
  17. StructuredStream StateStore机制

查看jar包中是否包含某个文件

1
jar vtf analysis-tool-1.0.0-SNAPSHOT.jar application.yml