Вариант использования задания:
Создание групп событий (транзакций), которые связаны друг с другом, на основе членов события и времени начала события, из 3 потоков, не полагаясь на время обработки. У нас есть входная пропускная способность arround 20K событий / сек c.
Все события транзакции отправляются нескольким темам kafka (источникам заданий) в конце транзакции, с возможностью опоздания до минут.
Мы хотим создать группы событий, у которых промежуток времени начала меньше нескольких секунд и которые определяются бизнес-ключом (например, страна и тип транзакции). Транзакции могут длиться от нескольких секунд до нескольких часов, но все события, образующие группу транзакций, должны поступить в течение ~ 5 минут после окончания транзакции.
Реализация:
Задание использует 3 источника данных, назначает временные метки и водяные знаки, отображает события в общий интерфейс, объединяет потоки, объединяет объединенный поток в поток, создает окно сеанса по времени окончания транзакции с опозданием, фильтрует группы с огромными числами (причина бизнес), создает подпрограмму -группы (на основе времени начала) внутри групп из окна сеанса и, наконец, отправляет результаты в другую топи c.
Проблема:
Когда при выполнении задания в кластере YARN с разъемами kafka в качестве ввода / вывода задание выполняется медленно и плохо масштабируется. (~ 3K событий / se c на паралеллизме 1, до 5K событий / se c на параллелизме 10)
Что я пробовал:
- Играя с масштабированием ТМ, слотов и памяти, на максимуме у нас было около 10 ТМ с использованием 2 слотов, каждый с памятью 32 ГБ. (Поддерживая один и тот же паралелизм на разделах kafka как на входе, так и на выходе)
- Воспроизведение с помощью taskmanager.network.memory.fraction, containerized.heap-cutoff-ratio, taskmanager.memory.fraction
- Игра вокруг с kafka.producer.batch.size, kafka.producer.max.request.size
- Установка fs, а затем бэкэнда rocksd и установка для taskmanager.memory.off-heap значения true, taskmanager.memory .preallocate to true
- Отключение контрольных точек
- Обновление Flink до 1.9.2, kafka-client до последних использованных версий, серверы kafka, confluent и cloudera ...
Код:
Я воссоздал упрощенный вариант использования, код здесь: https://github.com/vladimirtomecko/flink-playground и работа выглядит в основном так:
implicit val (params, config) = PersonTransactionCorrelationConfig.parse(args)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val aTransactions: DataStream[PersonTransactionUniversal] =
env
.addSource(???)
.assignTimestampsAndWatermarks(???)
.flatMap(_.toUniversal: Option[PersonTransactionUniversal])
val bTransactions: DataStream[PersonTransactionUniversal] =
env
.addSource(???)
.assignTimestampsAndWatermarks(???)
.flatMap(_.toUniversal: Option[PersonTransactionUniversal])
val cTransactions: DataStream[PersonTransactionUniversal] =
env
.addSource(???)
.assignTimestampsAndWatermarks(???)
.flatMap(_.toUniversal: Option[PersonTransactionUniversal])
aTransactions
.union(bTransactions)
.union(cTransactions)
.keyBy(new PersonTransactionKeySelector)
.window(EventTimeSessionWindows.withGap(Time.seconds(config.gapEndSeconds)))
.allowedLateness(Time.seconds(config.latenessSeconds))
.aggregate(new PersonTransactionAggregateFunction)
.filter(new PersonTransactionFilterFunction(config.groupMaxSize))
.flatMap(new PersonTransactionFlatMapFunction(config.gapStartSeconds * 1000))
.addSink(new DiscardingSink[PersonTransactionsGroup])
val backend = new RocksDBStateBackend(config.checkpointsDirectory, true)
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
env.setStateBackend(backend: StateBackend)
env.execute(getClass.getSimpleName)
Вопрос:
- Моя реализация неверна для этого варианта использования?
- Есть что-то, что я пропустил?
- Есть ли какая-нибудь другая оптимизация, которую я могу попробовать?
- У меня есть проблемы с нахождением узкого места в этом сценарии, есть ли какие-то советы?
Спасибо
PS Постер с первого раза, будьте добры, пожалуйста.
Редактировать 1:
Входные темы кафки разделены на стороне производителя с помощью той же функции, что и в keyBy.
Количество разделов равно или точно в 2 раза больше, чем параллелизм потока.
Разделов на топи c имеют аналогичное количество событий (отклонение ~ 5-10%).
Темы заполнены различным количеством событий (A имеет в 10 раз больше событий, чем B, B имеет в 1000 раз больше событий, чем C), но на самом деле это не должно быть проблемой.