У нас есть Spark Streaming Application, работающий на Spark 2.3.3
По сути, он открывает поток Кафки:
kafka_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "mykafka:9092") \
.option("subscribe", "mytopic") \
.load()
В теме кафки есть 2 раздела. После этого в столбце есть несколько базовых операций фильтрации, некоторые пользовательские функции Python и explode (), например:
stream = apply_operations(kafka_stream)
где apply_operations выполняет всю работу с данными. В конце мы хотели бы записать поток в приемник, т.е. e.:
stream.writeStream \
.format("our.java.sink.Class") \
.option("some-option", "value") \
.trigger(processingTime='15 seconds') \
.start()
Чтобы эта потоковая операция работала вечно, мы применяем:
spark.streams.awaitAnyTermination()
В конце концов.
Пока все хорошо. Все работает в течение нескольких дней. Но из-за проблем с сетью задание умерло на несколько дней, и теперь в потоке kafka миллионов сообщений ожидают, чтобы их догнать.
Когда мы перезапускаем задание потоковой передачи данных, используя spark-submit, первая партия будет слишком большой и займет много времени. Мы подумали, что может быть способ ограничить размер первого пакета каким-либо параметром, но мы не нашли ничего, что могло бы помочь.
Мы пытались:
spark.streaming.backpressure.enabled = true вместе с spark.streaming.backpressure.initialRate = 2000 и spark.streaming.kafka.maxRatePerPartition = 1000 и spark.streaming.receiver.maxrate = 2000
установка параметра spark.streaming.backpressure.pid.minrate на более низкое значение также не оказала влияния
установка опции ("maxOffsetsPerTrigger", 10000) также не имела эффекта
Теперь, после того, как мы перезапустим конвейер, рано или поздно вся работа Spark снова рухнет. Мы не можем просто расширить память или ядра, которые будут использоваться для искровой работы.
Есть что-то, что мы пропустили, чтобы контролировать количество событий, обрабатываемых в одном потоке?