На HDP у нас есть чтение Spark Streaming от Kafka с Direct API. Версии:
HDP: 3.1.0.0-78
Scala: 2.11
Spark: 2.3.2
Kafka: 2.2.0
Работа в нашей тестовой среде в течение нескольких недель выполнялась нормально, но в итоге произошел сбой с приведенным ниже исключением. Работа уже потребляла более 200 миллионов записей, и мы развернули ее в нашей производственной среде, где она потерпела крах, потребив всего несколько миллионов сообщений. (Тип сжатия был zstd
.)
Мы пытались воспроизвести исключение в среде разработки, но не смогли скопировать подозрительные данные из тестирования / производства в разработку. Данные выглядели так, как ожидалось, и мы не могли распознать какую-либо закономерность, почему и когда возникает эта ошибка. Кроме того, мы могли бы использовать проблемные смещения c с Kafka-Console-Consumer без каких-либо проблем.
В Google нет ничего, что можно найти об этом исключении, кроме кода Кафки на Github . Чтобы сэкономить время, я хотел бы поделиться нашим опытом с этими вопросами и ответами.
Полная трассировка стека ошибки:
org.apache.kafka.common.KafkaException: Received exception when fetching the next record from myTopic-0. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1231)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1238)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:136)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:71)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:271)
at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:231)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:116)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:108)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.record.InvalidRecordException: Incorrect declared batch size, records still remaining in file