У меня есть тема, где публикуются события о файлах.По этой теме создается группировка с таблицей с последним событием для каждого файла.Вроде как "источник света событий".Процесс выглядит следующим образом:
- Получить новый файл
- Запросить его последний статус
- Более 3 ошибок -> прервать
- Состояние обновленияв PENDING и повторите попытку + 1 (используйте KafkaProducer для создания нового события и дождитесь обратного вызова)
- Файл процесса
- Запрос последнего статуса
- Установить состояние как SUCCESSFUL (использоватьтот же KafkaProducer для создания нового события и ожидания обратного вызова)
Немного кода:
Создание нового события:
event -> {
CompletableFuture<RecordMetadata> retFuture = new CompletableFuture<>();
topicProducer.send(new ProducerRecord<>(applicationTopicName, flowId, event), (recordMetaData, exception) -> {
if (exception != null) {
retFuture.completeExceptionally(exception);
} else {
retFuture.complete(recordMetaData);
}
});
return retFuture;
}
Ихранилище, которое запрашивается позже:
HistoryEventSerde historyEventSerde = new HistoryEventSerde();
KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
historyEventSerde));
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
.withKeySerde(new HistoryEventKeySerde())
.withValueSerde(new HistoryEventSerde())
)
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
Теперь для файла примера (давайте назовем его file1) это происходит:
- Запрос его последнего статуса -> null
- более 3 ошибок?-> Нет
- создать новое событие с RetryCount 0 и состоянием PENDING
- файл процесса
- Запросить его последнее состояние -> Количество попыток PENDING 0 (это нормально)
- Установить состояние на УСПЕХ и опубликовать на kafka
В первый раз все было хорошо.Теперь давайте предположим, что тот же файл снова входит в процесс:
- Запросить его последнее состояние -> УСПЕШНО, 0 попыток
- Больше 3 ошибок?-> Нет
- Создание нового события с числом попыток 1 и состоянием PENDING
- файл процесса
- Запрос последнего статуса -> SUCCESSFUL, 0 повторных попыток <---- Проблема,это не последнее состояние </li>
В чем может быть проблема, что это происходит?У меня было предположение, что моя операция уменьшения не подходит (как в: События не в том же порядке), однако с тех пор я добавил временную метку к своим событиям и уменьшил их таким образом, чтобы остались только последние, нопроисходит то же поведение, что и выше.Что я делаю не так?