Потоки Kafka: семантика "точно один раз" для сообщений, сгенерированных в функции пунктуатора - PullRequest
1 голос
/ 03 апреля 2019

Я хочу использовать Kafka Streams Processor API и генерировать некоторые сообщения каждую минуту в запланированной функции пунктуатора .Может ли Kafka Streams гарантировать, что эти сообщения будут записаны в выходную тему ровно один раз?

Я понимаю, что в Kafka Streams возможна обработка только один раз, потому что она выполняет одну транзакцию из следующих операций:

  1. Фиксировать смещение для входной темы
  2. Записать результат в выходную тему

Распространяется ли эта концепция на функции пунктуатора в API процессора, для которых нет связанныхвходное сообщение нуждается в коммите?

Например, эта функция пунктуатора выполняет итерации по элементам в хранилище состояний значения ключа .Каждый элемент удаляется из хранилища и пересылается в нисходящем направлении:

override def punctuate(timestamp: Long) : Unit =
  store.all.asScala.foreach { keyValue =>
      store.delete(keyValue.key)
      context.forward(keyValue.key, keyValue.value)
  }

Каждое сообщение в хранилище должно появляться в выходной теме ровно один раз, даже в случае сбоя процессора и перезапуска.

Предположим, что магазин является постоянным;это поддержано темой журнала изменений kafka.Пунктуатор запланирован каждую минуту настенных часов.Я настроил processing.guarantee=exactly_once в моей конфигурации.

1 Ответ

2 голосов
/ 04 апреля 2019

Точно однажды семантика также применима, если вы используете Punctuator.

Под капотом использования хранилища состояний понимается запись в тему журнала изменений (даже удаление - запись сообщения с некоторым ключом и значением null)

В вашем случае Kafka Streams будет читать сообщения из некоторой входной темы и записывать в выходную тему и в некоторую тему журнала изменений (операция в хранилище состояний).

Если вы включите ровно один раз в Kafka Streams, он будет работать в режиме транзакция .Используя транзакцию - атомарную многосегментную запись - Kafka Streams гарантирует, что при выполнении фиксации смещения результаты записывались в выходную тему, а хранилище состояний также мигало в теме журнала изменений на брокерах.Вышеуказанные операции являются атомарными, поэтому, если одна из них не удастся, приложение обработает сообщения с предыдущей позиции смещения.Все вышеперечисленное будет работать, потому что Processor::process и Punctuator::punctuate(...) выполняются в одном потоке для определенного раздела.

Более подробную информацию можно найти:

...