Откат государственного магазина Kafka на исключение топологии - PullRequest
0 голосов
/ 16 мая 2018

Я пытаюсь использовать хранилище состояний для дедупликации сообщений, полученных топологией kafka-streams (т.е. дублирование на основе некоторого ключа дедупликации, производного от бизнеса, в случае, если производитель дублирует сообщения в течение длительного периода времени).

Я замечаю, что если я добавлю значение в хранилище ключей во время шага преобразования, а затем выброшу исключение на последующем шаге, подписка откатится до предыдущей контрольной точки, но хранилище состояний сохранит свои значения, что кажется неверным.

Существует ли «правильный» способ использования хранилища состояний в топологии, при котором состояние откатывается после того, как топология выдает исключение?

nb это поведение реплицируется даже с точно-семантика однократной доставки

Например (упрощенно).

Толоплогия:

streamsBuilder
    .addStateStore(storeBuilder)
    .<String, MessageType>stream("input-topic")
    .transform(() -> new Deduplicator(storeName))
    .map(mapper::explode) //just throws an exception
    .to(output-topic);

Трансформатор дедупликации:

public KeyValue<String, MessageType> transform(String key, MessageType value) {

    String transactionId = getTransactionId();
    boolean isDuplicate;

    try (WindowStoreIterator<String> timeIterator = deduplicationStore.fetch(transactionId, calculateWindowStart(), calculateWindowEnd())){
        isDuplicate = timeIterator.hasNext();
    }

    if (isDuplicate(transactionId)) {
        return null;
    }

    deduplicationStore.put(transactionId, transactionId, Instant.now().toEpochMilli());

    return KeyValue.pair(key, value);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...