Как установить оптимальные значения конфигурации - время запуска, maxOffsetsPerTrigger - для структурированной потоковой передачи Spark при чтении сообщений из Kafka? - PullRequest
1 голос
/ 17 июня 2019

У меня есть приложение структурированного потокового вещания, которое читает сообщения от Кафки. Общее количество сообщений в день составляет приблизительно 18 миллиардов с максимальным числом сообщений в минуту = 12 500 000. Максимальный размер сообщения составляет 2 КБ.

Как мне убедиться, что мое приложение структурированной потоковой передачи способно справиться с таким большим объемом и скоростью данных? По сути, я просто хочу знать, как установить оптимальное время запуска, maxOffsetsPerTrigger или любую другую конфигурацию, которая обеспечивает бесперебойную работу и способна обрабатывать сбои и перезапуски.

1 Ответ

1 голос
/ 18 июня 2019

Вы можете запустить приложение потоковой структурированной искры в виде микропартий с фиксированным интервалом или непрерывно.Вот некоторые параметры, которые можно использовать для настройки потоковых приложений.

Конфигурации Kafka:

Количество разделов в Kafka:

Вы можете увеличитьколичество перегородок в кафке.В результате большее количество потребителей могут читать данные одновременно.Задайте для него соответствующее число в зависимости от скорости ввода и количества серверов начальной загрузки.

Конфигурации потоковой передачи Spark:

Конфигурация памяти драйвера и исполнителя:

Рассчитатьразмер данных (#records * размер каждого сообщения) в каждом пакете и соответственно установить память.

Количество исполнителей:

Установить количество исполнителей в число разделов в теме кафки,Это увеличивает параллелизм.Количество задач, которые читают данные одновременно.

Ограничить количество смещений:

Ограничение скорости на максимальное количество смещений, обрабатываемых за интервал запуска.Указанное общее количество смещений будет пропорционально разделено по теме. Разделы разного объема.

  val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "topicName")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", "1000000")
    .load()

Восстановление после сбоев с указанием проверки:

В случае сбоя или преднамеренного выключения выможет восстановить предыдущий прогресс и состояние предыдущего запроса и продолжить с того места, где он был прерван.Это делается с помощью журналов проверки и записи с опережением.

finalDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()

Триггер:

Настройки триггера потокового запроса определяют время обработки потоковых данных, независимо от того, выполняется ли запросвыполняется как микропакетный запрос с фиксированным интервалом или как запрос непрерывной обработки.

...