KStream повторяется в случае ошибки - PullRequest
0 голосов
/ 31 мая 2018

У меня есть следующая реализация

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues(String::toUpperCase)
                .groupByKey()
                .reduce((String value1, String value2) -> value1 + value2,
                        TimeWindows.of(1000),
                        "windowStore")
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print();

        return stream;
    }

, и я хотел бы в случае любой ошибки продолжать повторную попытку, пока сообщение не будет отправлено.Мне известна аннотация Retryable, но я все равно не вижу, как обработать выданное исключение и повторить попытку.

Ответы [ 2 ]

0 голосов
/ 31 мая 2018

См. StreamsConfig.RETRIES_CONFIG

"Установка значения больше нуля приведет к тому, что клиент отправит любой запрос, который завершится неудачно с потенциально кратковременной ошибкой.";

Сам клиент kafka будет повторять

также

общедоступная статическая конечная строка RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";

общедоступная статическая конечная строка RETRY_BACKOFF_MS_DOC = "Theвремя ожидания перед попыткой повторить неудачный запрос к заданному разделу темы. Это позволяет избежать повторной отправки запросов в замкнутом цикле при некоторых сценариях сбоя. ";

0 голосов
/ 31 мая 2018

Если вы просто хотите повторить попытку для этой части кода, вы можете написать ее вручную:

Something smt = kStreamBuilder.stream("streamingTopic1");
            .mapValues(String::toUpperCase)
            .groupByKey()
            .reduce((String value1, String value2) -> value1 + value2,
                    TimeWindows.of(1000),
                    "windowStore")
            .toStream()
            .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
            .filter((i, s) -> s.length() > 40)
            .collectToSomething();

while (true) {
    try {
        return smt.to("streamingTopic2");
    } catch (Exception ex) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {
            // log
        }
    }
}

В конечном счете, вы не хотите делать while навсегда, так что выпо желанию можно создать ограничение на повтор

...