У меня есть простое приложение Spring, основанное на Kafka Streams, которое принимает сообщение из входящей темы, выполняет преобразование map
и печатает это сообщение.KStream
настроен следующим образом
@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}
Внутри MyTransformer
Я звоню во внешнюю микросервисную службу, которая в данный момент может быть недоступна.Если вызов не удался (обычно выбрасывает RuntimeException
), я не могу выполнить свое преобразование.
Вопрос здесь заключается в том, чтобы снова обработать сообщение в приложении Streams, если во время предыдущей обработки произошла ошибка?
Исходя из моих текущих исследований, я не могу этого сделать, единственная возможность, которую я имею, - это вставить сообщение в тему мертвых букв и попытаться обработать его в будущем, если оно снова не удастся, я нажимаю на него.снова в DLT и повторите попытку.