структурированная потоковая передача выбрала неверное текущее смещение от кафки - PullRequest
0 голосов
/ 24 января 2019

при запуске структурированной потоковой передачи с использованием lib: "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0", мы продолжаем получать ошибку относительно текущей выборки смещения:

Причина: org.apache.spark.SparkException: задание прервано из-за стадии Ошибка: Задача 0 на этапе 0.0 не выполнена 4 раза, последний сбой: Потерян Задание 0.3 на этапе 0.0 (TID 3, qa2-hdp-4.acuityads.org, исполнитель 2): java.lang.AssertionError: утверждение не выполнено: последние изменения и т. д. -9223372036854775808 не равно -1 в scala.Predef $ .assert (Predef.scala: 170) в org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange (KafkaMicroBatchReader.scala: 371) в . Org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader (KafkaMicroBatchReader.scala: 329) в org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader (KafkaMicroBatchReader.scala: 314) в org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute (DataSourceRDD.scala: 42) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 52) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 52) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 52) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 324) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 288) в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 99) в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 55) в org.apache.spark.scheduler.Task.run (Task.scala: 121) в org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply (Executor.scala: 402) в org.apache.spark.util.Utils $ .tryWithSafeFinally (Utils.scala: 1360) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 408) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) at java.lang.Thread.run (Thread.java:745)

по какой-то причине похоже, что fetchLatestOffset вернул Long.MIN_VALUE для одного из разделов. Я проверил контрольную точку структурированной потоковой передачи, которая была правильной, это значение currentAvailableOffset было установлено в Long.MIN_VALUE.

версия брокера kafka: 1.1.0. lib мы использовали:

{{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" }}

как воспроизвести: в основном мы запустили структурированный стример и подписали тему на 4 раздела. затем выдал несколько сообщений в тему, работа потерпела крах и зарегистрировала трассировку стека, как указано выше.

также зафиксированные смещения кажутся хорошими, как мы видим в журналах:

=== Streaming Query ===
Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 31878627-d473-4ee8-955d-d4d3f3f45eb9]
Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":1}}}
Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":-9223372036854775808}}}

так что потоковая передача в режиме искры записала правильное значение для раздела: 0, но текущие доступные смещения, возвращаемые из kafka, показывают Long.MIN_VALUE.

1 Ответ

0 голосов
/ 25 января 2019

обнаружил проблему, это связано с целочисленным переполнением внутри библиотеки структурированного потокового воспроизведения. подробности размещены здесь: https://issues.apache.org/jira/browse/SPARK-26718

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...