Хранение потоков Kafka, не отражающее текущее состояние в KTable - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть тема, где публикуются события о файлах.По этой теме создается группировка с таблицей с последним событием для каждого файла.Вроде как "источник света событий".Процесс выглядит следующим образом:

  • Получить новый файл
  • Запросить его последний статус
  • Более 3 ошибок -> прервать
  • Состояние обновленияв PENDING и повторите попытку + 1 (используйте KafkaProducer для создания нового события и дождитесь обратного вызова)
  • Файл процесса
  • Запрос последнего статуса
  • Установить состояние как SUCCESSFUL (использоватьтот же KafkaProducer для создания нового события и ожидания обратного вызова)

Немного кода:

Создание нового события:

event -> {
        CompletableFuture<RecordMetadata> retFuture = new CompletableFuture<>();
        topicProducer.send(new ProducerRecord<>(applicationTopicName, flowId, event), (recordMetaData, exception) -> {
            if (exception != null) {
                retFuture.completeExceptionally(exception);
            } else {
                retFuture.complete(recordMetaData);
            }
        });

        return retFuture;
    }

Ихранилище, которое запрашивается позже:

        HistoryEventSerde historyEventSerde = new HistoryEventSerde();
    KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
            historyEventSerde));


    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
                    .withKeySerde(new HistoryEventKeySerde())
                    .withValueSerde(new HistoryEventSerde())
            )
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));

Теперь для файла примера (давайте назовем его file1) это происходит:

  • Запрос его последнего статуса -> null
  • более 3 ошибок?-> Нет
  • создать новое событие с RetryCount 0 и состоянием PENDING
  • файл процесса
  • Запросить его последнее состояние -> Количество попыток PENDING 0 (это нормально)
  • Установить состояние на УСПЕХ и опубликовать на kafka

В первый раз все было хорошо.Теперь давайте предположим, что тот же файл снова входит в процесс:

  • Запросить его последнее состояние -> УСПЕШНО, 0 попыток
  • Больше 3 ошибок?-> Нет
  • Создание нового события с числом попыток 1 и состоянием PENDING
  • файл процесса
  • Запрос последнего статуса -> SUCCESSFUL, 0 повторных попыток <---- Проблема,это не последнее состояние </li>

В чем может быть проблема, что это происходит?У меня было предположение, что моя операция уменьшения не подходит (как в: События не в том же порядке), однако с тех пор я добавил временную метку к своим событиям и уменьшил их таким образом, чтобы остались только последние, нопроисходит то же поведение, что и выше.Что я делаю не так?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...