Кафки потребляют из 2 тем и принимают одинаковое количество сообщений - PullRequest
1 голос
/ 08 июля 2019

Я выполнил определенное требование и хотел бы услышать мнение людей и, конечно, не изобретать велосипед.

У меня есть 2 темы Кафки - A и B.

A и B будут заполнены сообщениями с разной скоростью приема. Например: A может быть сначала заполнен 10K-сообщениями, а затем B. Процесс глотания - это то, что мы не можем контролировать. Для нас это как сторонняя апстрим-система.

Мне нужно взять сообщения из этих двух тем и смешать их в равной пропорции. Например: если настроенный размер равен 50. Затем я должен взять 50 из A и 50 из B (или подождать, пока он у меня не появится), а затем отправить его в другую тему кафки как 100 (с равными пропорциями A и B). .

Мне было интересно, как лучше это решить? Хотя я смотрел на семантику соединений KStreams и KTables, я не совсем уверен, что это допустимый вариант использования для объединения (поскольку в сообщении нет ключа, соединяющего эти 2 потока или таблицы).

Можно ли это сделать без Kafka Streams? Потребитель Vanilla Kafka (возможно, с некоторыми дозировками?) Мысли?

1 Ответ

2 голосов
/ 08 июля 2019

С помощью Spring создайте 2 @KafkaListener s, один для A, один для B;установите для режима подтверждения контейнера значение MANUAL и добавьте Acknowledgment к сигнатуре метода.

В каждом приемнике накапливайте записи до тех пор, пока не получите 50, затем приостановите контейнер приемника (чтобы Кафка больше не отправлял,но потребитель остается живым).

Возможно, вам придется установить max.poll.records на 1, чтобы лучше контролировать потребление.

Когда у вас есть 50 в каждом;объединить и отправить.

Зафиксируйте смещения, вызвав acknowledge() последнего Acknowledgment, полученного в A и B.

Возобновите контейнеры.

Повторите.

Отсрочка коммитов смещения позволит избежать потери записи в случае сбоя сервера, пока вы находитесь в стадии накопления.

Если у вас много сообщений в обеих темах, вы можете пропустить паузу / возобновлениечасть.

...