生而为人

程序员的自我修养

0%

原始sql:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
WITH sku_info AS
(SELECT a.sku_id,
d.bu_id,
if(c.management_type = 3, 'pop', '自营') AS is_3p,
b.cat1_id,
b.cat1_name,
b.cat2_id,
b.cat2_name,
max(on_shelf) AS is_onshelf
FROM
(SELECT sku_code AS sku_id,
sales_grid_id,
on_shelf,
bu_id
FROM mart_caterb2b.dim_prod_all_map_csu__grid_his
WHERE dt = '20210317'
AND channel_id = '1001'
UNION ALL SELECT sku_code,
sales_grid_id,
on_shelf,
bu_id
FROM mart_caterb2b.fact_caterb2b_csu_grid_onshelf_log
WHERE update_time >='2021-03-18'
AND update_time <= '2021-03-19'
AND on_shelf = 1
AND channel_id = '1001'
GROUP BY 1,
2,
3,
4) a
LEFT JOIN
(SELECT DISTINCT sku_id,
cat1_id,
cat1_name,
cat2_id,
cat2_name
FROM mart_caterb2b.dim_caterb2b_csu_his
WHERE dt = '20210318'
AND channel_id = '1001') b ON a.sku_id = b.sku_id
LEFT JOIN mart_caterb2b.dim_sm_business_entity c ON a.bu_id = c.bu_id
LEFT JOIN mart_caterb2b.dim_sell_grid d ON a.sales_grid_id = d.sell_grid_id
GROUP BY 1,
2,
3,
4,
5,
6,
7)



SELECT coalesce(f.bu_id, '-1') AS bu_id,
coalesce(f.cat1_id, '-1') AS cat1_id,
coalesce(f.cat1_name, '全部') AS cat1_name,
coalesce(f.cat2_id, '-1') AS cat2_id,
coalesce(f.cat2_name, '全部') AS cat2_name,
coalesce(f.is_3p, '全部') AS is_3p,
count(DISTINCT if(is_view = 1, f.sku_id, NULL)) AS view_sku_ct,
count(DISTINCT if(is_view = 1, f.customer_id, NULL)) AS view_uv,
count(DISTINCT if(is_view = 1, f.sku_id, NULL), if(is_view = 1, f.customer_id, NULL)) AS view_sku_uv,
count(DISTINCT if(is_intention = 1, f.sku_id, NULL)) AS view_sku_ct,
count(DISTINCT if(is_intention = 1, f.customer_id, NULL)) AS view_uv,
count(DISTINCT if(is_intention = 1, f.sku_id, NULL), if(is_intention = 1, f.customer_id, NULL)) AS view_sku_uv
FROM
(SELECT d.sku_id,
d.bu_id,
d.customer_id,
coalesce(e.cat1_id, '-99') AS cat1_id,
coalesce(e.cat1_name, '其他') AS cat1_name,
coalesce(e.cat2_id, '-99') AS cat2_id,
coalesce(e.cat2_name, '其他') AS cat2_name,
coalesce(e.is_3p, '其他') AS is_3p,
d.is_view,
d.is_intention
FROM
(SELECT if(click.event_type = 'view', 1, 0) AS is_view,
CASE WHEN (click.val_cid = 'page_csu_detail'
AND click.val_bid = 'b_htrmpzlb')
OR click.val_bid IN ('b_h856xuac',
'b_x6m8625m') THEN 1
ELSE 0
END AS is_intention,
c.sku_id,
coalesce(cust.bu_id, '-99') AS bu_id,
click.customer_id
FROM mart_caterb2b.fact_flow_visit_click_kv_day click
JOIN mart_caterb2b.dim_caterb2b_csu_his c
ON click.csu_id = c.csu_id
LEFT JOIN
(SELECT DISTINCT customer_id,
bu_id
FROM mart_caterb2b.dim_caterb2b_customer_his
WHERE dt = '20210225'
AND business_type = 2
AND cat_type = 1
AND channel_id = '1001') cust ON click.customer_id = cust.customer_id
WHERE click.dt = '20210225'
AND click.csu_id IS NOT NULL
AND c.dt = '20210225' )d
LEFT JOIN sku_info e ON d.sku_id = e.sku_id
AND d.bu_id = e.bu_id)f
GROUP BY f.bu_id,
f.cat1_id,
f.cat1_name,
f.cat2_id,
f.cat2_name,
f.is_3p
GROUPING sets((f.bu_id, f.cat1_id, f.cat1_name, f.cat2_id, f.cat2_name, f.is_3p),
(f.bu_id, f.cat1_id, f.cat1_name, f.cat2_id, f.cat2_name),
(f.bu_id, f.cat1_id, f.cat1_name, f.is_3p),
(f.bu_id, f.cat1_id, f.cat1_name),
(f.bu_id, f.is_3p),
(f.bu_id),
(f.cat1_id, f.cat1_name, f.cat2_id, f.cat2_name, f.is_3p),
(f.cat1_id, f.cat1_name, f.cat2_id, f.cat2_name),
(f.cat1_id, f.cat1_name, f.is_3p),
(f.cat1_id, f.cat1_name),
(f.is_3p),
())

添加参数:

SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
– 分区
SET hive.exec.max.dynamic.partitions=1000;
– 触发spark合并小文件的阈值(1M)
SET spark.sql.mergeSmallFileSize=1048576;
– spark shuffle默认分区数
SET spark.sql.shuffle.partitions=500;
– 限制最大申请资源
set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.minExecutors=3;
set spark.dynamicAllocation.maxExecutors=300;
set spark.executor.cores=2;

如果不添加参数,在最后一步,资源急剧下降,导致执行时间很长

explode 与 lateral view 对比

select user_coupon_id, explode(split('0,1', ',')) as tag
from mart_waimai.aggr_act_ord_use_coupon_dd
where dt='20200920'
limit 10

背景

统计端维度的日活、下单UV、成交UV等,由于端信息只能从流量表里获取,所以需要从流量表关联订单表得到成交信息。

开发过程

原始sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
SELECT b.region_id,
b.region_name,
b.management_city_id,
b.management_city_name,
a.device_type_id,
count(DISTINCT a.customer_id) AS dau,
count(DISTINCT IF (a.event_type = 'view'
AND a.val_cid = 'page_csu_list', a.customer_id, NULL)) AS home_expose_uv,
count(DISTINCT IF (a.event_type = 'click'
AND a.val_cid = 'page_csu_list', a.customer_id, NULL)) AS home_click_uv,
count(DISTINCT IF (a.val_bid = 'b_h8ds56xuac', a.customer_id, NULL)) AS add_cart_uv,
count(DISTINCT IF (a.val_cid = 'page_order_confirm'
AND a.val_bid = 'b_drqbsnud', a.customer_id, NULL)) AS order_uv,
count(DISTINCT IF (c.order_id IS NOT NULL, a.customer_id, NULL)) AS order_arrange_uv,
sum(IF (c.order_id IS NOT NULL, a.arranged_amt, 0)) AS conversion_sales_amt
FROM mart_caterb2b.fact_flow_visit_click_kv_day a
JOIN
(SELECT m.dt,
m.customer_id,
n.region_id,
n.region_name,
n.management_city_id,
n.management_city_name
FROM mart_caterb2b.dim_caterb2b_customer_his m
JOIN mart_caterb2b.dim_sm_management_city_info n ON m.city_id = n.city_id
WHERE m.dt = '20210307') b ON a.customer_id = b.customer_id
AND a.dt = b.dt
LEFT JOIN
(SELECT DISTINCT dt,
order_id,
parent_order_id
FROM mart_caterb2b.fact_biz_order_day
WHERE dt = '20210307') d ON a.order_id = d.parent_order_id
AND a.dt = d.dt
LEFT JOIN
(SELECT dt,
order_id,
sum(arranged_amt) AS arranged_amt
FROM mart_caterb2b.mid_deal_order_item_withpop
WHERE dt = '20210307'
AND is_arranged = 1
AND channel_id = 1001
AND order_type NOT IN (3,
4)-- 排除补货换货

AND spu_type != 5 -- 排除包装物

GROUP BY dt,
order_id) c ON d.dt = c.dt
AND d.order_id = c.order_id
WHERE a.dt = '20210307'
GROUP BY b.region_id,
b.region_name,
b.management_city_id,
b.management_city_name,
a.device_type_id

出现数据倾斜

查看出问题stage

定位sql

怀疑是因为d表dt很多关联不上为null,导致某些某些节点量特别大,所以考虑提前关联c表数据,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
SELECT b.region_id,
b.region_name,
b.management_city_id,
b.management_city_name,
a.device_type_id,
count(DISTINCT a.customer_id) AS dau,
count(DISTINCT IF (a.event_type = 'view'
AND a.val_cid = 'page_csu_list', a.customer_id, NULL)) AS home_expose_uv,
count(DISTINCT IF (a.event_type = 'click'
AND a.val_cid = 'page_csu_list', a.customer_id, NULL)) AS home_click_uv,
count(DISTINCT IF (a.val_bid = 'b_h856xuac', a.customer_id, NULL)) AS add_cart_uv,
count(DISTINCT IF (a.val_cid = 'page_order_confirm'
AND a.val_bid = 'b_drqbsnud', a.customer_id, NULL)) AS order_uv,
count(DISTINCT IF (a.val_cid = 'page_order_confirm'
AND a.val_bid = 'b_drqbsnud'
AND d.order_id IS NOT NULL, a.customer_id, NULL)) AS order_arrange_uv,
sum(IF (a.val_cid = 'page_order_confirm'
AND a.val_bid = 'b_drqbsnud'
AND d.order_id IS NOT NULL, d.arranged_amt, 0)) AS conversion_sales_amt
FROM mart_caterb2b.fact_flow_visit_click_kv_day a
JOIN
(SELECT m.dt,
m.customer_id,
n.region_id,
n.region_name,
n.management_city_id,
n.management_city_name
FROM mart_caterb2b.dim_caterb2b_customer_his m
JOIN mart_caterb2b.dim_sm_management_city_info n ON m.city_id = n.city_id
WHERE m.dt = '$now.datekey') b ON a.customer_id = b.customer_id
AND a.dt = b.dt
LEFT JOIN
(SELECT e.dt,
e.parent_order_id,
c.order_id,
c.arranged_amt
FROM mart_caterb2b.fact_biz_order_day e
left JOIN
(SELECT dt,
order_id,
sum(arranged_amt) AS arranged_amt
FROM mart_caterb2b.mid_deal_order_item_withpop
WHERE dt = '$now.datekey'
AND is_arranged = 1
AND channel_id = 1001
AND order_type NOT IN (3,
4)-- 排除补货换货

AND spu_type != 5 -- 排除包装物

GROUP BY dt,
order_id) c ON e.dt = c.dt
AND e.order_id = c.order_id
WHERE e.dt = '$now.datekey') d ON a.order_id = d.parent_order_id
AND a.dt = d.dt
WHERE a.dt = '$now.datekey'
GROUP BY b.region_id,
b.region_name,
b.management_city_id,
b.management_city_name,
a.device_type_id
GROUPING SETS ((b.region_id, b.region_name, b.management_city_id, b.management_city_name, a.device_type_id),
(b.region_id, b.region_name, a.device_type_id),
(b.region_id, b.region_name, b.management_city_id, b.management_city_name),
(b.region_id, b.region_name),
(a.device_type_id),
())

但是并没有改善

说明a表与b表关联时候,由于能用order_id关联上是少部分的数据(下单数据),而不能关联上的还是被分配到两个节点上处理。

所以只能将数据拆分处理,sql如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
WITH flow_log_info AS
(SELECT a.dt,
b.region_id,
b.region_name,
b.management_city_id,
b.management_city_name,
a.device_type_id,
a.customer_id,
a.event_type,
a.val_cid,
a.val_bid,
a.order_id
FROM mart_caterb2b.fact_flow_visit_click_kv_day a
JOIN
(SELECT m.dt,
m.customer_id,
n.region_id,
n.region_name,
n.management_city_id,
n.management_city_name
FROM mart_caterb2b.dim_caterb2b_customer_his m
JOIN mart_caterb2b.dim_sm_management_city_info n ON m.city_id = n.city_id
WHERE m.dt = '$now.datekey') b ON a.customer_id = b.customer_id
AND a.dt = b.dt
WHERE a.dt = '$now.datekey')

INSERT OVERWRITE TABLE `${target.table}` PARTITION (dt)
SELECT coalesce(e.region_id, '-1') AS region_id,
coalesce(e.region_name, '全部') AS region_name,
coalesce(e.management_city_id, '-1') AS management_city_id,
coalesce(e.management_city_name, '全部') AS management_city_name,
coalesce(e.device_type_id, '-1') AS device_type_id,
e.dau,
e.home_expose_uv,
e.home_click_uv,
e.add_cart_uv,
e.order_uv,
e.order_arrange_uv,
e.conversion_sales_amt,
if(e.home_expose_uv = 0, 0, 1 - (e.home_click_uv / e.home_expose_uv)) AS bounce_rate,
if(e.dau = 0, 0, e.order_arrange_uv / e.dau) AS dau_arrange_rate,
if(e.dau = 0, 0, e.conversion_sales_amt / e.dau) AS sales_amt_per_uv,
'$now.datekey' AS dt
FROM
(SELECT c.region_id,
c.region_name,
c.management_city_id,
c.management_city_name,
c.device_type_id,
sum(c.dau) AS dau,
sum(c.home_expose_uv) AS home_expose_uv,
sum(c.home_click_uv) AS home_click_uv,
sum(c.add_cart_uv) AS add_cart_uv,
sum(c.order_uv) AS order_uv,
sum(c.order_arrange_uv) AS order_arrange_uv,
sum(c.conversion_sales_amt) AS conversion_sales_amt
FROM
(SELECT a.region_id,
a.region_name,
a.management_city_id,
a.management_city_name,
a.device_type_id,
count(DISTINCT a.customer_id) AS dau,
count(DISTINCT IF (a.event_type = 'view'
AND a.val_cid = 'page_csu_list', a.customer_id, NULL)) AS home_expose_uv,
count(DISTINCT IF (a.event_type = 'click'
AND a.val_cid = 'page_csu_list', a.customer_id, NULL)) AS home_click_uv,
count(DISTINCT IF (a.val_bid = 'b_h856xuac', a.customer_id, NULL)) AS add_cart_uv,
count(DISTINCT IF (a.val_cid = 'page_order_confirm'
AND a.val_bid = 'b_drqbsnud', a.customer_id, NULL)) AS order_uv,
0 AS order_arrange_uv,
0 AS conversion_sales_amt
FROM flow_log_info a
GROUP BY a.region_id,
a.region_name,
a.management_city_id,
a.management_city_name,
a.device_type_id
GROUPING SETS ((a.region_id, a.region_name, a.management_city_id, a.management_city_name, a.device_type_id),
(a.region_id, a.region_name, a.device_type_id),
(a.region_id, a.region_name, a.management_city_id, a.management_city_name),
(a.region_id, a.region_name),
(a.device_type_id),
())

UNION ALL

SELECT a.region_id,
a.region_name,
a.management_city_id,
a.management_city_name,
a.device_type_id,
0 AS dau,
0 AS home_expose_uv,
0 AS home_click_uv,
0 AS add_cart_uv,
0 AS order_uv,
count(DISTINCT IF (a.val_cid = 'page_order_confirm'
AND a.val_bid = 'b_drqbsnud'
AND d.order_id IS NOT NULL, a.customer_id, NULL)) AS order_arrange_uv,
sum(IF (a.val_cid = 'page_order_confirm'
AND a.val_bid = 'b_drqbsnud'
AND d.order_id IS NOT NULL, d.arranged_amt, 0)) AS conversion_sales_amt
FROM flow_log_info a
JOIN
(SELECT e.dt,
e.parent_order_id,
c.order_id,
c.arranged_amt
FROM mart_caterb2b.fact_biz_order_day e
LEFT JOIN
(SELECT dt,
order_id,
sum(arranged_amt) AS arranged_amt
FROM mart_caterb2b.mid_deal_order_item_withpop
WHERE dt = '$now.datekey'
AND is_arranged = 1
AND channel_id = 1001
AND order_type NOT IN (3,
4)-- 排除补货换货

AND spu_type != 5 -- 排除包装物

GROUP BY dt,
order_id) c ON e.dt = c.dt
AND e.order_id = c.order_id
WHERE e.dt = '$now.datekey') d ON a.order_id = d.parent_order_id
AND a.dt = d.dt
GROUP BY a.region_id,
a.region_name,
a.management_city_id,
a.management_city_name,
a.device_type_id
GROUPING SETS ((a.region_id, a.region_name, a.management_city_id, a.management_city_name, a.device_type_id),
(a.region_id, a.region_name, a.device_type_id),
(a.region_id, a.region_name, a.management_city_id, a.management_city_name),
(a.region_id, a.region_name),
(a.device_type_id),
()) )c
GROUP BY c.region_id,
c.region_name,
c.management_city_id,
c.management_city_name,
c.device_type_id) e

总结

当用join关联时,一定要注意数据的量级分布,包括能关联上的和不能关联上的,提前计算一下!

原理探究

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#operations-on-streaming-dataframesdatasets

https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html

The Internals of Spark Structured Streaming (Apache Spark 2.4.4)

已读

Spark——Structured Streaming

  1. 整体介绍
  2. watermark工作原理
    1. watermark工作时,需要设置withwatermark与groupby使用相同time field

未读

一、简介

专访朱诗雄:Apache Spark 中的全新流式引擎 Structured Streaming

看了这篇博客,你还敢说不会Structured Streaming?

Note_Spark_Day13:Structured Streaming

Spark Structured Streaming笔记

Spark教程:Spark Structured Streaming入门编程指南

Spark从入门到精通(09):结构化流(Structured Streaming)(上)

Spark Structured Streaming高级特性

Spark Structured Streaming

二、架构

三、watermark

四、state

五、streaming+kafka

重启任务如何保证紧接上一次消费

官网:Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

Spark streaming消费Kafka的正确姿势

Tutorial: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight

Processing Data in Apache Kafka with Structured Streaming in Apache Spark 2.2

How to write spark streaming DF to Kafka topic

Apache Kafka transactional writer with foreach sink, is it possible?

spark向kafka写入数据

Spark : Best way to Broadcast KafkaProducer to Spark streaming

Spark Dataframe to Kafka

How to write a Dataset to Kafka topic?

六、文件

Spark2.0入门:Structured Streaming操作文件流

Spark之Spark Streaming处理文件流数据

Spark2.0入门:Structured Streaming操作文件流

spark流式读取hdfs中数据

七、config

Configuration Properties

sqlconf.scala

八、stage

InMemoryTableScanExec Leaf Physical Operator

九、源码

Spark Structrued Streaming源码分析–(三)Aggreation聚合状态存储与更新

十、监控

ProgressReporter Contract

StreamingQueryListener — Intercepting Life Cycle Events of Streaming Queries

如何使用 Spark 3.0 中新加的 Structured Streaming UI 来进行异常分析

二十、未分类

MicroBatchExecution — Stream Execution Engine of Micro-Batch Stream Processing

spark运行没有启动job

Spark Structured Streaming app has no jobs and no stages

问题:

  1. spark broadcast kafkaproducer
  2. dataframe write to kafka each
  3. structured streaming 流式读取文件 多层
  4. structured streaming finished without job

Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode

Structured Streaming Programming Guide

这个状态执行几分钟

1
21/09/08 07:09:55 INFO StateStore [state-store-maintenance-task]: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@2f640a0b

Streaming job gets stuck writing to checkpoint

Spark stateful streaming processing is stuck in StateStoreSave stage!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
User class threw exception: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
metricValueProvider (org.apache.kafka.common.metrics.KafkaMetric)
metrics (org.apache.kafka.common.metrics.Metrics)
metrics (org.apache.kafka.clients.producer.internals.BufferPool)
free (org.apache.kafka.clients.producer.internals.RecordAccumulator)
accumulator (org.apache.kafka.clients.producer.KafkaProducer)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:245)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:291)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:292)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1493)
at com.microsoft.sam.HistoryDataUploader.run(HistoryDataUploader.scala:121)
at com.microsoft.sam.HistoryDataUploader$.main(HistoryDataUploader.scala:207)
at com.microsoft.sam.HistoryDataUploader.main(HistoryDataUploader.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.RuntimeException: Could not serialize lambda
at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:69)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
... 35 more
Caused by: java.lang.NoSuchMethodException: org.apache.kafka.common.network.Selector$SelectorMetrics$$Lambda$38/747274186.writeReplace()
at java.lang.Class.getDeclaredMethod(Class.java:2130)
at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:60)

spark3.0运行,延迟很长时间才调用 onQueryProgress,2.4上没有遇到该问题

Spark Structured Streaming StreamingQueryListener.onQueryProgress not called per microbatch?