Я пытаюсь написать простое приложение Kafka Streams (нацеленное на Kafka 2.2 / Confluent 5.2), чтобы преобразовать тему ввода с семантикой хотя бы один раз в поток вывода ровно один раз.Я хотел бы закодировать следующую логику:
- Для каждого сообщения с заданным ключом:
- Считать метку времени сообщения из строкового поля в значении сообщения
- Получить наибольшую временную метку, которую мы ранее видели для этого ключа, из локального хранилища состояний
- Если временная метка сообщения меньше или равна отметке времени в хранилище состояний, ничего не генерировать
- Если отметка времени больше, чем отметка времени в хранилище состояний, или ключ не существует в хранилище состояний, отправьте сообщение и обновите хранилище состояний с помощью ключа / отметки времени сообщения
(Это гарантированно даст правильные результаты, основанные на гарантиях упорядочения, которые мы получаем от вышестоящей системы; я не пытаюсь здесь что-либо сделать магическим.)
Сначала я подумал, что мог бы сделать это с оператором Kafka Streams flatMapValues
, который позволяет сопоставить каждое входное сообщение с нулем или несколькими выходными сообщениями с помощью одной и той же клавиши.Однако эта документация явно предупреждает:
Это операция записи без записи (см. TransformValues (ValueTransformerSupplier, String ...) для преобразования значений с состоянием).
Звучит многообещающе, но в документации transformValues
не ясно, как выдавать ноль или одно выходное сообщение на входное сообщение.Если только это не пытается сказать // or null
в этом примере?
flatTransform
также выглядело несколько многообещающе, но мне не нужно манипулировать ключом, и, если возможно, я бы хотел избежатьперераспределение.
Кто-нибудь знает, как правильно выполнять этот вид фильтрации?