Получить последние значения из темы о потребительском запуске, а затем продолжить в обычном - PullRequest
2 голосов
/ 18 апреля 2019

У нас есть производитель Kafka, который генерирует сообщения с очень высокой частотой для тем, время хранения которых = 10 часов.Эти сообщения являются обновлениями в реальном времени, а используемый ключ - это идентификатор элемента, значение которого изменилось.Таким образом, тема выступает в качестве журнала изменений и будет иметь много дублирующих ключей.

Теперь мы пытаемся достичь того, чтобы потребитель Kafka запускался независимо от последнего известного состояния (новый потребитель, сбой,перезапуск и т. д.), он каким-то образом создаст таблицу с последними значениями всех ключей в теме, а затем продолжит прослушивание новых обновлений в обычном режиме, сохраняя минимальную нагрузку на сервер Kafka и позволяя потребителю выполнять большую частьработа.Мы попробовали много способов, и ни один из них не кажется лучшим.

Что мы пробовали:

1 тема журнала изменений + 1 компактная тема:

  1. Производитель отправляет то же самоесообщение в обе темы, завернутое в транзакцию, чтобы обеспечить успешную отправку.
  2. Потребитель запускает и запрашивает последнее смещение темы журнала изменений.
  3. Потребляет сжатую тему от начала до построения таблицы.
  4. Продолжает использовать журнал изменений после запрошенного смещения.

Минусы:

  • Наличие дубликатов в уплотненной теме - очень высокая вероятность дажес установкой максимально возможной частоты сжатия журналов.
  • x2 количество тем на сервере Kakfa.

KSQL:

В KSQL нам либо нужно переписать KTableв качестве темы, чтобы потребитель мог ее увидеть (Дополнительные темы), или нам потребуется, чтобы потребители выполнили KSQL SELECT с использованием KSQL Rest Server и запросили таблицу (не так быстро и производительно, как API-интерфейсы Kafka)).

API потребителя Kafka:

Потребитель запускается и использует тему с самого начала.Это сработало отлично, но потребитель должен использовать 10-часовой журнал изменений для построения последней таблицы значений.

Потоки Kafka:

Используя KTables следующим образом:

KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));

Kafka Streams создаст 1 тему на сервере Kafka для каждой таблицы KTable (с именем {consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog), что приведет к огромному количеству тем, поскольку у нас большое количество потребителей.

Из того, что мы попробовали, этоПохоже, нам нужно либо увеличить нагрузку на сервер, либо время запуска потребителя.Разве нет «идеального» способа достичь того, что мы пытаемся сделать?

Заранее спасибо.

1 Ответ

2 голосов
/ 18 апреля 2019

Используя KTables, Kafka Streams создаст 1 тему на сервере Kafka для каждого KTable, что приведет к огромному количеству тем, поскольку у нас большое количество потребителей.

Если вы просто читаете существующую тему в KTable (через StreamsBuilder#table()), то Kafka Streams не создает никаких дополнительных тем. То же самое для KSQL.

Было бы полезно, если бы вы могли уточнить, что именно вы хотите делать с таблицей KTable. Видимо, вы делаете что-то, что приводит к созданию дополнительных тем?

1 тема журнала изменений + 1 компактная тема:

Почему вы думали о двух разных темах? Обычно темы журнала изменений всегда должны быть сжаты. И учитывая ваше описание варианта использования, я не вижу причины, почему это не должно быть:

Теперь мы пытаемся добиться того, чтобы при запуске клиента Kafka независимо от последнего известного состояния (новый потребитель, сбой, перезапуск и т. Д.) Он каким-то образом создавал таблицу с последними значениями все ключи в теме, а затем продолжает слушать новые обновления в обычном режиме [...]

Следовательно, сжатие было бы очень полезно для вашего варианта использования. Это также предотвратит описанную вами проблему:

Потребитель запускается и потребляет тему с самого начала. Это сработало отлично, но потребитель должен использовать журнал изменений за 10 часов для построения таблицы последних значений.

Обратите внимание, что для восстановления последних значений таблицы все три Kafka Streams, KSQL и Kafka Consumer должны полностью прочитать основную тему таблицы (от начала до конца). Если эта тема НЕ уплотнена, это может занять много времени в зависимости от объема данных, настроек хранения темы и т. Д.

Из того, что мы попробовали, похоже, что нам нужно либо увеличить нагрузку на сервер, либо время запуска потребителя. Разве не существует «идеального» способа достичь того, что мы пытаемся сделать?

Не зная больше о вашем сценарии использования, особенно о том, что вы хотите делать с таблицами KTable после их заполнения, мой ответ будет:

  • Убедитесь, что «тема журнала изменений» также сжата.
  • Попробуйте сначала KSQL. Если это не удовлетворяет вашим потребностям, попробуйте Kafka Streams. Если это не удовлетворяет вашим потребностям, попробуйте Kafka Consumer.

Например, я бы не использовал Kafka Consumer, если предполагается, что он выполняет какую-либо обработку с сохранением данных «таблицы», поскольку в Kafka Consumer отсутствует встроенная функциональность для отказоустойчивой обработки с учетом состояния.

...