Я пытаюсь использовать хранилище состояний для дедупликации сообщений, полученных топологией kafka-streams (т.е. дублирование на основе некоторого ключа дедупликации, производного от бизнеса, в случае, если производитель дублирует сообщения в течение длительного периода времени).
Я замечаю, что если я добавлю значение в хранилище ключей во время шага преобразования, а затем выброшу исключение на последующем шаге, подписка откатится до предыдущей контрольной точки, но хранилище состояний сохранит свои значения, что кажется неверным.
Существует ли «правильный» способ использования хранилища состояний в топологии, при котором состояние откатывается после того, как топология выдает исключение?
nb это поведение реплицируется даже с точно-семантика однократной доставки
Например (упрощенно).
Толоплогия:
streamsBuilder
.addStateStore(storeBuilder)
.<String, MessageType>stream("input-topic")
.transform(() -> new Deduplicator(storeName))
.map(mapper::explode) //just throws an exception
.to(output-topic);
Трансформатор дедупликации:
public KeyValue<String, MessageType> transform(String key, MessageType value) {
String transactionId = getTransactionId();
boolean isDuplicate;
try (WindowStoreIterator<String> timeIterator = deduplicationStore.fetch(transactionId, calculateWindowStart(), calculateWindowEnd())){
isDuplicate = timeIterator.hasNext();
}
if (isDuplicate(transactionId)) {
return null;
}
deduplicationStore.put(transactionId, transactionId, Instant.now().toEpochMilli());
return KeyValue.pair(key, value);
}