生而为人

程序员的自我修养

0%

How to Avoid the ReceiverDisconnectedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
21/08/19 02:18:45 ERROR TaskSetManager [task-result-getter-0]: Task 32 in stage 73.0 failed 4 times; aborting job
21/08/19 02:18:45 ERROR FileFormatWriter [stream execution thread for [id = 5f382b82-2006-49a4-a657-5512d5bc6cce, runId = e783c09d-8ea7-4848-b018-2422590426ce]]: Aborting job 6d36e286-ecb0-4a8e-9928-5ac7a0fbb7d2.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 73.0 failed 4 times, most recent failure: Lost task 32.3 in stage 73.0 (TID 18190, wn126-msnbi.awfbdxsze1iudhhki0l2sbzfaf.bx.internal.cloudapp.net, executor 8): java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-26-18237' with higher epoch of '0' is created hence current receiver 'spark-8-18190' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:ddef38220000007a00008bf9611dbf7b_G4_B16, SystemTracker:msnsam-prod:eventhub:sambeacon-ueq~4223|$default, Timestamp:2021-08-19T02:18:45, errorContext[NS: msnsam-prod.servicebus.windows.net, PATH: sambeacon-ueq/ConsumerGroups/$Default/Partitions/32, REFERENCE_ID: LN_23ed71_1629339510773_d5c9_G4, PREFETCH_COUNT: 500, LINK_CREDIT: 200, PREFETCH_Q_LEN: 0]
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.completeExceptionally(ExceptionUtil.java:116)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.drainPendingReceives(MessageReceiver.java:505)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onError(MessageReceiver.java:490)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:790)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.processOnClose(BaseLinkHandler.java:73)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.handleRemoteLinkClosed(BaseLinkHandler.java:109)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:51)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-26-18237' with higher epoch of '0' is created hence current receiver 'spark-8-18190' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:ddef38220000007a00008bf9611dbf7b_G4_B16, SystemTracker:msnsam-prod:eventhub:sambeacon-ueq~4223|$default, Timestamp:2021-08-19T02:18:45, errorContext[NS: msnsam-prod.servicebus.windows.net, PATH: sambeacon-ueq/ConsumerGroups/$Default/Partitions/32, REFERENCE_ID: LN_23ed71_1629339510773_d5c9_G4, PREFETCH_COUNT: 500, LINK_CREDIT: 200, PREFETCH_Q_LEN: 0]
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:43)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:789)
... 15 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2065)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)