У меня есть следующая реализация
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues(String::toUpperCase)
.groupByKey()
.reduce((String value1, String value2) -> value1 + value2,
TimeWindows.of(1000),
"windowStore")
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print();
return stream;
}
, и я хотел бы в случае любой ошибки продолжать повторную попытку, пока сообщение не будет отправлено.Мне известна аннотация Retryable, но я все равно не вижу, как обработать выданное исключение и повторить попытку.