Перезапуск задания структурированной потоковой передачи Spark потребляет миллионы сообщений Kafka и умирает - PullRequest
7 голосов
/ 02 апреля 2019

У нас есть Spark Streaming Application, работающий на Spark 2.3.3

По сути, он открывает поток Кафки:

  kafka_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "mykafka:9092") \
  .option("subscribe", "mytopic") \
  .load()

В теме кафки есть 2 раздела. После этого в столбце есть несколько базовых операций фильтрации, некоторые пользовательские функции Python и explode (), например:

   stream = apply_operations(kafka_stream)

где apply_operations выполняет всю работу с данными. В конце мы хотели бы записать поток в приемник, т.е. e.:

   stream.writeStream \
   .format("our.java.sink.Class") \
   .option("some-option", "value") \
   .trigger(processingTime='15 seconds') \
   .start()

Чтобы эта потоковая операция работала вечно, мы применяем:

   spark.streams.awaitAnyTermination()

В конце концов.

Пока все хорошо. Все работает в течение нескольких дней. Но из-за проблем с сетью задание умерло на несколько дней, и теперь в потоке kafka миллионов сообщений ожидают, чтобы их догнать.

Когда мы перезапускаем задание потоковой передачи данных, используя spark-submit, первая партия будет слишком большой и займет много времени. Мы подумали, что может быть способ ограничить размер первого пакета каким-либо параметром, но мы не нашли ничего, что могло бы помочь.

Мы пытались:

  • spark.streaming.backpressure.enabled = true вместе с spark.streaming.backpressure.initialRate = 2000 и spark.streaming.kafka.maxRatePerPartition = 1000 и spark.streaming.receiver.maxrate = 2000

  • установка параметра spark.streaming.backpressure.pid.minrate на более низкое значение также не оказала влияния

  • установка опции ("maxOffsetsPerTrigger", 10000) также не имела эффекта

Теперь, после того, как мы перезапустим конвейер, рано или поздно вся работа Spark снова рухнет. Мы не можем просто расширить память или ядра, которые будут использоваться для искровой работы.

Есть что-то, что мы пропустили, чтобы контролировать количество событий, обрабатываемых в одном потоке?

1 Ответ

3 голосов
/ 11 апреля 2019

Вы написали в комментариях, что используете spark-streaming-kafka-0-8_2.11 и что версия API не может обрабатывать maxOffsetPerTrigger (или любой другой механизм для уменьшения количества потребляемыхсообщения, насколько я знаю), поскольку это было только реализовано для более новых API spark-streaming-kafka-0-10_2.11 .Этот более новый API также работает с вашей версией kafka 0.10.2.2 согласно документации .

...