Spark Streaming commitASync: OffsetOutOfRangeException - PullRequest
1 голос
/ 24 апреля 2019

Я разработал приложение Spark Streaming, которое работает на 120-секундных микропакетах, по сути, преобразовывая файлы JSON в файлы Avro.

С тех пор, как я начал использовать новую тему Kafka, я столкнулся с проблемой, в которой я случайно получаю OffsetOutOfRangeException при каждых нескольких микропакетах.

Эти микропакеты очень малы, около 20 сообщенийи займет всего несколько секунд, чтобы завершить.

В конце своей программы я фиксирую ASync и могу подтвердить, используя OffsetCallBack, что эти коммиты успешны для Kafka.

Единственный способ устранить эту ошибку - зеркальное отображение сообщений в другой теме Kafka (с теми же свойствами, что и в первой).

Используя основную тему, я попытался решить проблему на уровне приложения, увеличив ресурсы Spark (память исполнителя, память драйвера, очередь YARN), отключив кэш-память Kafka, обновив Spark до 2.4.0, уменьшиввремя микропакета до 30 с, 60 с, увеличение моей настройки отказоустойчивости, чтобы допускать больше ошибок

b'2019-04-24 12:24:01,218 INFO  - Computing topic xx.xx.xx.xx.xx, partition 0 offsets 1190252 -> 1190271\n'
b'2019-04-24 12:26:00,003 INFO  - xx.xx.xx.xx.EntryPoint$$anonfun$main$13$$anon$1: Offset onComplete for - xx.xx.xx.xx.xx- 0 - 1190252\n'
b'2019-04-24 12:26:00,003 INFO  - xx.xx.xx.xx.EntryPoint$$anonfun$main$13$$anon$1: Offsets successfully committed\n'
b'2019-04-24 12:26:00,092 INFO  - Computing topic xx.xx.xx.xx.xx, partition 0 offsets 1190271 -> 1190295\n'
b'2019-04-24 12:26:00,114 WARN  - Lost task 0.0 in stage 8.0 (TID 27, xx, executor 1): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {xx.xx.xx.xx.xx-0=1190272}\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:589)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:355)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:136)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:71)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:271)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:2...
b'2019-04-24 12:26:00,114 WARN  - Lost task 0.0 in stage 8.0 (TID 27, xx, executor 1): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {xx.xx.xx.xx.xx-0=1190272}\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:589)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:355)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:136)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:71)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:271)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:2...
b'2019-04-24 12:26:00,119 INFO  - Computing topic xx.xx.xx.xx.xx, partition 0 offsets 1190271 -> 1190295\n'
b'2019-04-24 12:26:00,247 WARN  - Lost task 0.1 in stage 8.0 (TID 28, xx, executor 1): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {xx.xx.xx.xx.xx-0=1190271}\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:589)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:355)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:136)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:68)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:271)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:2...
b'2019-04-24 12:26:00,247 WARN  - Lost task 0.1 in stage 8.0 (TID 28, xx, executor 1): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {xx.xx.xx.xx.xx-0=1190271}\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:589)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:355)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:136)\n\tat org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:68)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:271)\n\tat org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:2...
b'2019-04-24 12:26:00,252 INFO  - Computing topic xx.xx.xx.xx.xx, partition 0 offsets 1190271 -> 1190295\n'

Я почти исключил все, что я могу сделать на уровне приложения, почему я не могу прочитать этисмещения от Кафки, когда я явно зафиксировал их в моем предыдущем запуске?

...