Производитель Flink Kafka генерирует исключения при публикации потока с ключами - PullRequest
1 голос
/ 10 октября 2019

У меня проблемы с записью потока с ключами из подзадач приемника в тему вывода kafka.

Задание имеет вид: source -> filter -> keyby (id) -> flatmap -> sink

Исключения поступают от производителя кафки и приводят к контрольной точке для тайм-аута:

  • FlinkKafkaException: Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time

  • FlinkKafkaException: Failed to send data to Kafka: Expiring 16 record(s) for mytopic-11:120000 ms has passed since batch creation

Задание попадает в аварийный цикл с вышеуказанными исключениями и иногда кратковременно восстанавливается перед повторным аварийным сбросом. Я считаю, что проблема здесь заключается в том, что я использую ключи для определения выходных разделов, что приводит к тому, что подзадачи P-приемника для каждого вентилятора выводят записи в N выходных разделов. В идеале каждая подзадача должна выполнять запись только в один раздел.

Задание имеет следующие ограничения / свойства:

1: после того, как ключ был записан в раздел раздела выходной кафки, ему необходимовсегда записываться в один и тот же раздел kafka в будущем

2: параллелизм подзадачи приемника будет изначально равен числу выходных разделов

3: я должен быть в состоянии увеличить параллелизм вбудущее без нарушения # 1

4: я никогда не буду добавлять новые разделы в тему вывода kafka

Если parallelism == partitions, то я считаю, что FlinkFixedPartitioner будет хорошим решением. Однако я не думаю, что он будет уважать исходное отображение ключа-> раздела, если я позже увеличу параллелизм, так как он выбирает выходной раздел, используя эту схему.

Есть ли метод, который я мог бы использовать здесь, чтобы удовлетворить этиограничения? Возможно, настройка параметров производителя kafka, другой способ разделения потока с ключами или что-то еще?

1 Ответ

1 голос
/ 11 октября 2019

Вы предполагаете, что логика разделения, используемая Flink, такая же, как логика разделения, используемая Kafka. Вполне возможно (и это то, что я подозреваю, что происходит), что при наличии 4 ключей A, B, C и D Flink отправит A и B в один экземпляр приемника, а C и D - в другой экземпляр приемника. Тогда Кафка, вероятно, использует другую логику разделения, которая отправляет A и C в один раздел, а B и D. записываются в другой.

Кажется, Flink не хочет показывать группу ключей для данного ключа, ноесли ваш параллелизм для приемника такой же, как и количество разделов Kafka, то вы должны иметь возможность использовать task_id экземпляра приемника в пользовательской логике разбиения Kafka. Это немного грубая сила, но она должна делать то, что ты хочешь делать.

Если подумать еще немного, вы также можете написать собственный разделитель для Flink, который использует ту же логику, что и пользовательский разделитель для вашей темы Kafka. Это позволит справиться с масштабированием до большего количества экземпляров раковины.

...