У нас есть производитель Kafka, который генерирует сообщения с очень высокой частотой для тем, время хранения которых = 10 часов.Эти сообщения являются обновлениями в реальном времени, а используемый ключ - это идентификатор элемента, значение которого изменилось.Таким образом, тема выступает в качестве журнала изменений и будет иметь много дублирующих ключей.
Теперь мы пытаемся достичь того, чтобы потребитель Kafka запускался независимо от последнего известного состояния (новый потребитель, сбой,перезапуск и т. д.), он каким-то образом создаст таблицу с последними значениями всех ключей в теме, а затем продолжит прослушивание новых обновлений в обычном режиме, сохраняя минимальную нагрузку на сервер Kafka и позволяя потребителю выполнять большую частьработа.Мы попробовали много способов, и ни один из них не кажется лучшим.
Что мы пробовали:
1 тема журнала изменений + 1 компактная тема:
- Производитель отправляет то же самоесообщение в обе темы, завернутое в транзакцию, чтобы обеспечить успешную отправку.
- Потребитель запускает и запрашивает последнее смещение темы журнала изменений.
- Потребляет сжатую тему от начала до построения таблицы.
- Продолжает использовать журнал изменений после запрошенного смещения.
Минусы:
- Наличие дубликатов в уплотненной теме - очень высокая вероятность дажес установкой максимально возможной частоты сжатия журналов.
- x2 количество тем на сервере Kakfa.
KSQL:
В KSQL нам либо нужно переписать KTableв качестве темы, чтобы потребитель мог ее увидеть (Дополнительные темы), или нам потребуется, чтобы потребители выполнили KSQL SELECT
с использованием KSQL Rest Server и запросили таблицу (не так быстро и производительно, как API-интерфейсы Kafka)).
API потребителя Kafka:
Потребитель запускается и использует тему с самого начала.Это сработало отлично, но потребитель должен использовать 10-часовой журнал изменений для построения последней таблицы значений.
Потоки Kafka:
Используя KTables следующим образом:
KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));
Kafka Streams создаст 1 тему на сервере Kafka для каждой таблицы KTable (с именем {consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog
), что приведет к огромному количеству тем, поскольку у нас большое количество потребителей.
Из того, что мы попробовали, этоПохоже, нам нужно либо увеличить нагрузку на сервер, либо время запуска потребителя.Разве нет «идеального» способа достичь того, что мы пытаемся сделать?
Заранее спасибо.