Я хочу использовать Kafka Streams Processor API и генерировать некоторые сообщения каждую минуту в запланированной функции пунктуатора .Может ли Kafka Streams гарантировать, что эти сообщения будут записаны в выходную тему ровно один раз?
Я понимаю, что в Kafka Streams возможна обработка только один раз, потому что она выполняет одну транзакцию из следующих операций:
- Фиксировать смещение для входной темы
- Записать результат в выходную тему
Распространяется ли эта концепция на функции пунктуатора в API процессора, для которых нет связанныхвходное сообщение нуждается в коммите?
Например, эта функция пунктуатора выполняет итерации по элементам в хранилище состояний значения ключа .Каждый элемент удаляется из хранилища и пересылается в нисходящем направлении:
override def punctuate(timestamp: Long) : Unit =
store.all.asScala.foreach { keyValue =>
store.delete(keyValue.key)
context.forward(keyValue.key, keyValue.value)
}
Каждое сообщение в хранилище должно появляться в выходной теме ровно один раз, даже в случае сбоя процессора и перезапуска.
Предположим, что магазин является постоянным;это поддержано темой журнала изменений kafka.Пунктуатор запланирован каждую минуту настенных часов.Я настроил processing.guarantee=exactly_once
в моей конфигурации.