Я работаю в сценарии, когда дублированные сообщения могут поступать потребителю (приложение KStream).Чтобы использовать типичный случай, давайте предположим, что это OrderCreatedEvent и KStream имеет логику, которая обрабатывает заказ.У события есть идентификатор заказа, который поможет мне идентифицировать дублированные сообщения.
Я хочу сделать следующее:
1) Добавить каждый заказ в постоянное хранилище состояний
2) При обработке сообщения в KStream, запросите хранилище состояний, чтобы проверить, было ли уже получено сообщение, ничего не делая в этом случае.
val persistentKeyValueStore = Stores.persistentKeyValueStore("order-store")
val stateStore: Materialized<Int, Order, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Order>(persistentKeyValueStore)
.withKeySerde(intSerde)
.withValueSerde(orderSerde)
val orderTable: KTable<Int, Order> = input.groupByKey(Serialized.with(intSerde, orderSerde))
.reduce({ _, y -> y }, stateStore)
var orderStream: KStream<Int, Order> = ...
orderStream.filter { XXX }
.map { key, value ->
processingLogic()
KeyValue(key, value)
}...
В бите filter { XXX }
я хотел бызапросить в хранилище состояний проверить наличие идентификатора заказа (предположим, что он используется в качестве ключа хранилища ключей), отфильтровывая уже обработанные заказы (присутствующие в хранилище состояний).
Мой первый вопрос : как я могу запросить хранилище состояний в DSL KStream, например, внутри операции фильтрации.
Второй вопрос :в этом случае, как я могу обработать прибытие нового (ранее не обработанного сообщения)?Если таблица KTable сохраняет порядок в хранилище состояний ДО выполнения KStream orderStream, сообщение уже будет в хранилище.Их следует добавлять только после завершения обработки.Как я могу это сделать?Вероятно, я не должен использовать KTable для этого, но что-то вроде:
orderStream.filter { keystore.get(key) == null }
.map { key, value ->
processingLogic()
KeyValue(key, value)
}
.foreach { key, value ->
keystore.put(key, value);
}