Spark Streaming на Compact Kafka topi c - потребителям 1 сообщение на раздел на пакет - PullRequest
0 голосов
/ 01 апреля 2020

Наше потоковое задание Spark работало нормально, как и ожидалось (количество событий, обрабатываемых в пакете). Но по некоторым причинам мы добавили сжатие на Kafka topi c и возобновили работу. Но после перезапуска произошел сбой по следующей причине:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 16 in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in stage 2.0 (TID 231, 10.34.29.38, executor 4): java.lang.IllegalArgumentException: requirement failed: Got wrong record for spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39 even after seeking to offset 106847 got offset 199066 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)

Итак, я добавил spark.streaming.kafka.allowNonConsecutiveOffsets: true в конфигурации config и изменил имя группы для использования с начала. Теперь проблема в том, что он читает только одно сообщение из каждого раздела. Таким образом, если topi c имеет 50 разделов, то его чтение 50 сообщений на пакет (длительность пакета составляет 5 сек c).

Топи c составляет 1 млн. Записей, и потребитель имеет огромное отставание.

Журнал драйверов, который выбирает 1 сообщение на раздел.

20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951.
20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952.
20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953.
20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954.
20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955.
20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956.
20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957.
20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958.
20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959.
20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960.
20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961.
20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962.
20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963.
20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964.
20/03/31 18:27:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211965.
20/03/31 18:27:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211966.
20/03/31 18:27:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211967.
20/03/31 18:27:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211968.
20/03/31 18:27:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211969.
20/03/31 18:27:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45  to offset 211970. 

Spark Config (batch.duration: 5, используя Spark Stream):

 spark.shuffle.service.enabled: "true"
  spark.streaming.backpressure.enabled: "true"
  spark.streaming.concurrentJobs: "1"
  spark.executor.extraJavaOptions: "-XX:+UseConcMarkSweepGC"
  spark.streaming.backpressure.pid.minRate: 1500
  spark.streaming.backpressure.initialRate: 100
  spark.streaming.kafka.allowNonConsecutiveOffsets: true

Есть ли какие-либо проблемы в моей конфигурации или что-то особенное, что требуется с компактной Kafka topi c чего мне не хватает?

...