Я разработал потоковую искру (1.6.2) с Kafka в модели приемника и выполняю это задание с размером пакета в 15 секунд.
Самый первый пакет получает много событий и обрабатывает записи очень медленно.Внезапно работа терпит неудачу, и это возобновляется снова.Пожалуйста, смотрите скриншот ниже.
Он обрабатывает записи медленно, но не так, как ожидалось, чтобы завершить все эти пакеты вовремя и не хочет, чтобы эта очередь накапливалась.
Как мы можем контролировать этот размер входных данных для событий от 15 до 20k?Я попытался включить spark.streaming.backpressure.enabled, но никаких улучшений не увидел.
Я также реализовал уровень параллелизма при получении данных, как показано ниже, но все же я не увидел никаких изменений в размере ввода.
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
Я использую 6 исполнителей и 20 ядер.
Обзор моего кода:
Я читаю журналы от Kafka и обрабатываю их и храню вasticsearch в каждомИнтервал между партиями 15 секунд.
Не могли бы вы дать мне знать, как я могу контролировать размер ввода и улучшить производительность задания или как мы можем убедиться, что партии не накапливаются.