Повторная обработка сообщения при возникновении ошибки во время его обработки в Kafka Stream - PullRequest
0 голосов
/ 28 мая 2019

У меня есть простое приложение Spring, основанное на Kafka Streams, которое принимает сообщение из входящей темы, выполняет преобразование map и печатает это сообщение.KStream настроен следующим образом

@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
         PrintAction printAction, String topicName) {
    KStream<String, JsonNode> source = builder.stream(topicName,
                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
    // @formatter:off
    source
        .map(myTransformer)
        .foreach(printAction);
    // @formatter:on
    return source;
}

Внутри MyTransformer Я звоню во внешнюю микросервисную службу, которая в данный момент может быть недоступна.Если вызов не удался (обычно выбрасывает RuntimeException), я не могу выполнить свое преобразование.

Вопрос здесь заключается в том, чтобы снова обработать сообщение в приложении Streams, если во время предыдущей обработки произошла ошибка?

Исходя из моих текущих исследований, я не могу этого сделать, единственная возможность, которую я имею, - это вставить сообщение в тему мертвых букв и попытаться обработать его в будущем, если оно снова не удастся, я нажимаю на него.снова в DLT и повторите попытку.

1 Ответ

1 голос
/ 28 мая 2019

если во время обработки Kafka Streams произойдет какое-либо необработанное исключение, ваш поток изменит статус на ERROR и прекратит использовать входящие сообщения для раздела, в котором произошла ошибка. Вы должны ловить исключения самостоятельно. Повторные попытки могут быть выполнены либо: 1) с помощью Spring RetryTemplate для вызова внешнего микросервиса (но имейте в виду, что вы будете иметь задержки при получении сообщений из определенного раздела), либо 2) вставьте сообщение об ошибке в другую тему для последующей повторной обработки (как ты предложил)

...