Проблемы масштабирования окна сеанса Flink на YARN, Kafka - PullRequest
1 голос
/ 02 марта 2020

Вариант использования задания:

Создание групп событий (транзакций), которые связаны друг с другом, на основе членов события и времени начала события, из 3 потоков, не полагаясь на время обработки. У нас есть входная пропускная способность arround 20K событий / сек c.

Все события транзакции отправляются нескольким темам kafka (источникам заданий) в конце транзакции, с возможностью опоздания до минут.

Мы хотим создать группы событий, у которых промежуток времени начала меньше нескольких секунд и которые определяются бизнес-ключом (например, страна и тип транзакции). Транзакции могут длиться от нескольких секунд до нескольких часов, но все события, образующие группу транзакций, должны поступить в течение ~ 5 минут после окончания транзакции.

Реализация:

Задание использует 3 источника данных, назначает временные метки и водяные знаки, отображает события в общий интерфейс, объединяет потоки, объединяет объединенный поток в поток, создает окно сеанса по времени окончания транзакции с опозданием, фильтрует группы с огромными числами (причина бизнес), создает подпрограмму -группы (на основе времени начала) внутри групп из окна сеанса и, наконец, отправляет результаты в другую топи c.

Проблема:

Когда при выполнении задания в кластере YARN с разъемами kafka в качестве ввода / вывода задание выполняется медленно и плохо масштабируется. (~ 3K событий / se c на паралеллизме 1, до 5K событий / se c на параллелизме 10)

Что я пробовал:

  • Играя с масштабированием ТМ, слотов и памяти, на максимуме у нас было около 10 ТМ с использованием 2 слотов, каждый с памятью 32 ГБ. (Поддерживая один и тот же паралелизм на разделах kafka как на входе, так и на выходе)
  • Воспроизведение с помощью taskmanager.network.memory.fraction, containerized.heap-cutoff-ratio, taskmanager.memory.fraction
  • Игра вокруг с kafka.producer.batch.size, kafka.producer.max.request.size
  • Установка fs, а затем бэкэнда rocksd и установка для taskmanager.memory.off-heap значения true, taskmanager.memory .preallocate to true
  • Отключение контрольных точек
  • Обновление Flink до 1.9.2, kafka-client до последних использованных версий, серверы kafka, confluent и cloudera ...

Код:

Я воссоздал упрощенный вариант использования, код здесь: https://github.com/vladimirtomecko/flink-playground и работа выглядит в основном так:

implicit val (params, config) = PersonTransactionCorrelationConfig.parse(args)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val aTransactions: DataStream[PersonTransactionUniversal] =
  env
    .addSource(???)
    .assignTimestampsAndWatermarks(???)
    .flatMap(_.toUniversal: Option[PersonTransactionUniversal])

val bTransactions: DataStream[PersonTransactionUniversal] =
  env
    .addSource(???)
    .assignTimestampsAndWatermarks(???)
    .flatMap(_.toUniversal: Option[PersonTransactionUniversal])

val cTransactions: DataStream[PersonTransactionUniversal] =
  env
    .addSource(???)
    .assignTimestampsAndWatermarks(???)
    .flatMap(_.toUniversal: Option[PersonTransactionUniversal])

aTransactions
  .union(bTransactions)
  .union(cTransactions)
  .keyBy(new PersonTransactionKeySelector)
  .window(EventTimeSessionWindows.withGap(Time.seconds(config.gapEndSeconds)))
  .allowedLateness(Time.seconds(config.latenessSeconds))
  .aggregate(new PersonTransactionAggregateFunction)
  .filter(new PersonTransactionFilterFunction(config.groupMaxSize))
  .flatMap(new PersonTransactionFlatMapFunction(config.gapStartSeconds * 1000))
  .addSink(new DiscardingSink[PersonTransactionsGroup])


val backend = new RocksDBStateBackend(config.checkpointsDirectory, true)
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
env.setStateBackend(backend: StateBackend)

env.execute(getClass.getSimpleName)

Вопрос:

  • Моя реализация неверна для этого варианта использования?
  • Есть что-то, что я пропустил?
  • Есть ли какая-нибудь другая оптимизация, которую я могу попробовать?
  • У меня есть проблемы с нахождением узкого места в этом сценарии, есть ли какие-то советы?

Спасибо

PS Постер с первого раза, будьте добры, пожалуйста.

Редактировать 1:

Входные темы кафки разделены на стороне производителя с помощью той же функции, что и в keyBy.

Количество разделов равно или точно в 2 раза больше, чем параллелизм потока.

Разделов на топи c имеют аналогичное количество событий (отклонение ~ 5-10%).

Темы заполнены различным количеством событий (A имеет в 10 раз больше событий, чем B, B имеет в 1000 раз больше событий, чем C), но на самом деле это не должно быть проблемой.

...