Kafka Обработка ошибок: Processor.output (). Send (message, kafkaTimeoutInMS) всегда возвращает true и его асинхронность - PullRequest
0 голосов
/ 12 октября 2018

Может быть, эта проблема уже сообщена и решена. Я не нашел решения и каких-либо открытых вопросов, которые говорят об этом, поэтому создаю новую.Я пытаюсь обработать ошибку при публикации данных в теме Кафки.С помощью kafka spring steam мы нажимаем на kafka с помощью этого метода

if (processor.output().send(messsage , kafkaTimeoutInMS) && acknowledgment != null)
                {
                    LOGGER.debug("Acknowledgment provided");
                    LOGGER.info("Sending to Kafka successful");
                    acknowledgment.acknowledge();
                }
                else
                {
                    LOGGER.error("Sending to Kafka failed", message);
                }

Send () всегда возвращает true, я пытался остановить руководство kafka во время работы в режиме отладки, но все равно возвращает true.Я прочитал, что он асинхронный.

Я пробовал установить

bindings: output: producer: sync: true

Это не помогло.Но я вижу некоторую ошибку, которую я не могу использовать в своей логике, чтобы решить, есть ли сбой или успех.

Мы подтверждаем вручную, поэтому мы должны подтверждать, только когда он успешно отправлен в тему, и нам нужно зарегистрировать всенеудачные сообщения.Есть предложения?

1 Ответ

0 голосов
/ 12 октября 2018

Полагаю, вы неправильно поняли, как работает весенний облачный поток.В качестве фреймворка существует определенный договор между пользователем и фреймворком, и когда дело доходит до обмена сообщениями, автоматические попытки, повторные попытки, DLQ и многие другие аспекты обрабатываются автоматически, чтобы пользователь не сталкивался с этим вручную (как выпытаюсь сделать).

Подумайте о том, чтобы потратить немного времени и изучить руководство пользователя - https://docs.spring.io/spring-cloud-stream/docs/Fishtown.M3/reference/htmlsingle/

Кроме того, вот самый базовый пример, демонстрирующий типичное взаимодействие пользователя (разработчика) с фреймворком.Как видите, все, что вы делаете, это реализует простой обработчик, который получает и возвращает часть данных.Остальное (фактическое получение от Kafka и отправка в Kafka или любую другую систему обмена сообщениями) обрабатывается связывателями, предоставленными платформой.

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication { 

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class);
    }


    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String echo(String message) {
        return message;
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...