Как структурированная потоковая передача планирует логический план потокового запроса для каждой микропакета? - PullRequest
2 голосов
/ 31 октября 2019

Я настроил небольшой тест на своем ноутбуке, который выполняет следующее:

Я создал тему Kafka с несколькими 1000 сообщениями, где каждое сообщение содержит несколько строк, в каждой строке около 100 столбцов.

Создайте 300 довольно сложных столбцов Spark в списке [Column]. Нет агрегации.

При настройке потока из Kafka я устанавливаю .option ("maxOffsetsPerTrigger", 1), поэтому в каждой мини-партии обрабатывается только одно сообщение.

Затем я применяю столбцыдля мини-пакетов, состоящих только из одного сообщения.

val testStream = myStream
  .select(manyTestCols :_*)
  .writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

Для обработки каждого сообщения Spark требуется около 10 секунд.

Затем я изменяю maxOffsetsPerTrigger на .option ("maxOffsetsPerTrigger", 1000)Таким образом, в каждом мини-пакете обрабатывается 1000 сообщений.

Spark требуется около 11 секунд для обработки всех 1000 сообщений в каждом мини-пакете.

Итак, кажется, что Spark делает что-то вроде ""работа по настройке", а затем достаточно быстро обрабатывает каждую мини-партию, как только она начинает работать.

Эта "работа по настройке" проходит от планирования запроса до физического плана для каждой мини-партии?

Если да, то имеет ли смысл Spark делать это в каждой мини-партии?

Или что-то еще происходит? Смотрю исходный код Spark, но был бы признателен за отзывы от кого-то, кто уже прошел это упражнение.

Tx для любых идей.

1 Ответ

1 голос
/ 04 ноября 2019

Эта "работа по настройке" проходит через планирование запроса до физического плана для каждой мини-партии?

Частично да для специфичных для выполнения частей плана запросапотокового запроса, который должен быть заполнен во время выполнения следующим образом (со ссылками на соответствующие части кода):

  1. Правильные отношения для источников данных (например, LocalRelation для источников без данных)
  2. Водяной знак времени события
  3. Текущее (микропакетное) время

Если это так, имеет ли смысл Spark делать это в каждой мини-партии?

Абсолютно. В структурированной потоковой передаче нет другого пути к источникам короткого замыкания без данных, отслеживанию текущего времени и водяного знака.

Это также причина для дополнительной микропакеты без данных для операторов с состоянием когда, скажем, меняется водяной знак.

Просматриваю исходный код Spark, но был бы признателен за отзывы от кого-то, кто уже прошел это упражнение.

См. MicroBatchExecution и IncrementalExecution .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...