21/09/08 07:09:55 INFO StateStore [state-store-maintenance-task]: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@2f640a0b
User class threw exception: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda Serialization trace: metricValueProvider (org.apache.kafka.common.metrics.KafkaMetric) metrics (org.apache.kafka.common.metrics.Metrics) metrics (org.apache.kafka.clients.producer.internals.BufferPool) free (org.apache.kafka.clients.producer.internals.RecordAccumulator) accumulator (org.apache.kafka.clients.producer.KafkaProducer) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:245) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:292) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1493) at com.microsoft.sam.HistoryDataUploader.run(HistoryDataUploader.scala:121) at com.microsoft.sam.HistoryDataUploader$.main(HistoryDataUploader.scala:207) at com.microsoft.sam.HistoryDataUploader.main(HistoryDataUploader.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684) Caused by: java.lang.RuntimeException: Could not serialize lambda at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:69) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ... 35 more Caused by: java.lang.NoSuchMethodException: org.apache.kafka.common.network.Selector$SelectorMetrics$$Lambda$38/747274186.writeReplace() at java.lang.Class.getDeclaredMethod(Class.java:2130) at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:60)
With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark().
With this setting, data that is older than the watermark are filtered out.
The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp.
You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering).
Guarantees provided by this timeout are as follows:
Timeout will never be occur before watermark has exceeded the set timeout.
Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.
timeout晚于watermark不生效,早于watermark会报错?到底应该如何设置?
问题
flatmapgroupswithstate 什么时候会timeout
state-store-maintenance-task
参数调整
1 2 3 4 5 6
val spark = SparkSession .builder() .appName(appName) .config(sparkConf) .getOrCreate() spark.sessionState.conf.setConfString("spark.sql.streaming.minBatchesToRetain", "5")