при запуске структурированной потоковой передачи с использованием lib: "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0"
, мы продолжаем получать ошибку относительно текущей выборки смещения:
Причина: org.apache.spark.SparkException: задание прервано из-за стадии
Ошибка: Задача 0 на этапе 0.0 не выполнена 4 раза, последний сбой: Потерян
Задание 0.3 на этапе 0.0 (TID 3, qa2-hdp-4.acuityads.org, исполнитель 2):
java.lang.AssertionError: утверждение не выполнено: последние изменения и т. д.
-9223372036854775808 не равно -1 в scala.Predef $ .assert (Predef.scala: 170) в
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange (KafkaMicroBatchReader.scala: 371)
в
. Org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader (KafkaMicroBatchReader.scala: 329)
в
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader (KafkaMicroBatchReader.scala: 314)
в
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute (DataSourceRDD.scala: 42)
в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в
org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в
org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 52)
в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в
org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в
org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 52)
в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в
org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в
org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 52)
в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в
org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в
org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 99)
в
org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 55)
в org.apache.spark.scheduler.Task.run (Task.scala: 121) в
org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply (Executor.scala: 402)
в org.apache.spark.util.Utils $ .tryWithSafeFinally (Utils.scala: 1360)
в
org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 408)
в
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
в
java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617)
at java.lang.Thread.run (Thread.java:745)
по какой-то причине похоже, что fetchLatestOffset вернул Long.MIN_VALUE для одного из разделов. Я проверил контрольную точку структурированной потоковой передачи, которая была правильной, это значение currentAvailableOffset было установлено в Long.MIN_VALUE.
версия брокера kafka: 1.1.0.
lib мы использовали:
{{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" }}
как воспроизвести:
в основном мы запустили структурированный стример и подписали тему на 4 раздела. затем выдал несколько сообщений в тему, работа потерпела крах и зарегистрировала трассировку стека, как указано выше.
также зафиксированные смещения кажутся хорошими, как мы видим в журналах:
=== Streaming Query ===
Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 31878627-d473-4ee8-955d-d4d3f3f45eb9]
Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":1}}}
Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":-9223372036854775808}}}
так что потоковая передача в режиме искры записала правильное значение для раздела: 0, но текущие доступные смещения, возвращаемые из kafka, показывают Long.MIN_VALUE.