Как обрабатывать / перехватывать исключения, создаваемые в весенних облачных потоках (Elmhurst / 2.0) - PullRequest
0 голосов
/ 24 мая 2018

У меня сейчас проблема с обработкой исключений, возникающих при обработке сообщений в Spring Cloud Streams (Elmhurst.RELEASE).Когда мое приложение выдает исключение в основном методе обработки:

@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String process(String message) {
        externalService();
        return message.toUpperCase();
    }

    private void externalService() {
        throw new RuntimeException("An external call failed");
    }
}

Я, кажется, совершенно не могу перехватить это исключение на любом канале ошибки.Прочитав документацию, я ожидал, что смогу получить ErrorMessage либо на глобальном канале errorChannel, либо на конкретном канале input.myGroup.errors.

Я пытался использовать прослушиватели Spring Integrations:

@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
    return message;
}

А также слушатели Spring Cloud Streams:

@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
    return message;
}

безуспешно.Я также поиграл с различными комбинациями настроек конфигурации, такими как «errorChannelEnabled: true», «max-retries: 1» и т. Д., И у меня не было возможности отловить ошибку, выдаваемую моим процессором (используя точку отладки при возврате)сообщение "проверить).Я даже не получаю никаких сообщений об ошибках в теме при настройке «bindings.error.destination: myErrorTopic», как предлагается в документации SCS.

Единственное, что, похоже, работает вообще, это «enableDlq: true».«на уровне конфигурации связующего.Однако это не отвечает важным потребностям в возможности определения исходного исключения, поскольку то, что публикуется в DLQ, имеет только заголовок с полной трассировкой стека.Я не хочу анализировать трассировку стека, чтобы выяснить тип или сообщение исходного исключения.

Есть ли лучший подход, который я должен использовать здесь?Или я совершаю какую-то глупую ошибку?Моя общая цель - отправлять сообщения об исключениях с фактическим типом исключения и сообщениями об исключениях в DLQ.

Конечно, я мог бы размещать операторы try / catch вокруг каждого метода Processor / Source / Sink, а затем вручнуюмаршрут к другому связанному каналу, но это сильно умаляет ценностное предложение инфраструктуры SCS в моем уме.

Я нашел этот пример пользовательской обработки DLQ как часть this ранее StackOverflow вопрос, который, казалось бы, отвечал моим потребностям.Однако, похоже, что это совсем не работает с SCS Elmhurst / 2.0 и связывателем Spring Kafka, даже после переноса конфигурации в работу для обновленного SCS.

EDIT Я добавил Github репозиторий , воспроизводящий мою ошибку, поскольку копирование того же основного кода, что и ответ Гэри, похоже, не работает.Я начинаю задаваться вопросом, является ли это проблемой зависимости POM, конфигурации или связывателя Кафки.В этом репозитории используется тот же основной код, что и в ответе Гари, так как я думаю, что немного проще увидеть проблему и отладить.

РЕДАКТИРОВАТЬ ПОСЛЕ ОТВЕТА ГАРИ Я принял ответ Гари, когда он решаетМоя первоначальная проблема (обойти текущую проблему фреймворка, когда в конфигурации есть связующие среды).Однако сообщения DLQ оказались довольно бесполезными для моего случая.Я закончил тем, что подписался на "errorChannel", как только я получил это на ответ Гэри, и оттуда создал специальное сообщение об ошибке, которое я отправил на обычный связанный канал SCS.

1 Ответ

0 голосов
/ 24 мая 2018

Почему у вас return message;?Где вы ожидаете, что это пойдет?

Это прекрасно работает для меня ...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So50506586Application {

    public static void main(String[] args) {
        SpringApplication.run(So50506586Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(byte[] in) {
        throw new RuntimeException("fail");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void errors(ErrorMessage em) {
        System.out.println(em);
    }

}

ErrorMessage [payload = org.springframework.messaging.MessagingException: исключение, выдаваемое при вызовеcom.example.So50506586Application # listen [1 args];Вложенное исключение - java.lang.RuntimeException: сбой, failMessage = GenericMessage [payload = byte [3], заголовки = {kafka_offset = 0, ...

...

Причина:java.lang.RuntimeException: ошибка

...

РЕДАКТИРОВАТЬ

Это ошибка при использовании поддержки мульти-связывателя (черезenvironment материал).Канал глобальной ошибки не виден.

Это работает ...

spring:
  cloud:
      stream:
        bindings:
          input:
            destination: input
            group: myGroup7
            binder: kafka
#          error:
#            destination: errors
        kafka.bindings.input.consumer.autoCommitOnError: true
        kafka:
          binder:
            brokers: localhost:9092

(вам больше не нужны zkNodes с Elmhurst).

Кроме того, вы не должны 't использовать устаревшие вещи error.destination, поскольку сообщение, отправляемое в канал ошибок, теперь представляет собой объект ErrorMessage с большим количеством информации, поэтому используйте функцию dlq для автоматической публикации в теме.

Нам нужно удалитьэти устаревшие свойства назначения ошибок.

Это тоже не работает ...

@ServiceActivator(inputChannel = "input.myGroup.errors")
public void errors(ErrorMessage em) {
    System.out.println(em);
}

..., потому что активатор службы находится в другом контексте приложения с привязкой.

На данный момент единственное обходное решение - не использовать стиль конфигурации environment.У меня нет решения для вас, если вам нужна поддержка нескольких связывателей.

Кстати, с Elmhurst (и kafka 0.11 или выше) трассировка стека больше не встраивается в полезную нагрузку сообщений, отправляемых в DLQ., он отправляется как заголовок кафки.

...