У меня сейчас проблема с обработкой исключений, возникающих при обработке сообщений в Spring Cloud Streams (Elmhurst.RELEASE).Когда мое приложение выдает исключение в основном методе обработки:
@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String process(String message) {
externalService();
return message.toUpperCase();
}
private void externalService() {
throw new RuntimeException("An external call failed");
}
}
Я, кажется, совершенно не могу перехватить это исключение на любом канале ошибки.Прочитав документацию, я ожидал, что смогу получить ErrorMessage либо на глобальном канале errorChannel, либо на конкретном канале input.myGroup.errors.
Я пытался использовать прослушиватели Spring Integrations:
@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
А также слушатели Spring Cloud Streams:
@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
безуспешно.Я также поиграл с различными комбинациями настроек конфигурации, такими как «errorChannelEnabled: true», «max-retries: 1» и т. Д., И у меня не было возможности отловить ошибку, выдаваемую моим процессором (используя точку отладки при возврате)сообщение "проверить).Я даже не получаю никаких сообщений об ошибках в теме при настройке «bindings.error.destination: myErrorTopic», как предлагается в документации SCS.
Единственное, что, похоже, работает вообще, это «enableDlq: true».«на уровне конфигурации связующего.Однако это не отвечает важным потребностям в возможности определения исходного исключения, поскольку то, что публикуется в DLQ, имеет только заголовок с полной трассировкой стека.Я не хочу анализировать трассировку стека, чтобы выяснить тип или сообщение исходного исключения.
Есть ли лучший подход, который я должен использовать здесь?Или я совершаю какую-то глупую ошибку?Моя общая цель - отправлять сообщения об исключениях с фактическим типом исключения и сообщениями об исключениях в DLQ.
Конечно, я мог бы размещать операторы try / catch вокруг каждого метода Processor / Source / Sink, а затем вручнуюмаршрут к другому связанному каналу, но это сильно умаляет ценностное предложение инфраструктуры SCS в моем уме.
Я нашел этот пример пользовательской обработки DLQ как часть this ранее StackOverflow вопрос, который, казалось бы, отвечал моим потребностям.Однако, похоже, что это совсем не работает с SCS Elmhurst / 2.0 и связывателем Spring Kafka, даже после переноса конфигурации в работу для обновленного SCS.
EDIT Я добавил Github репозиторий , воспроизводящий мою ошибку, поскольку копирование того же основного кода, что и ответ Гэри, похоже, не работает.Я начинаю задаваться вопросом, является ли это проблемой зависимости POM, конфигурации или связывателя Кафки.В этом репозитории используется тот же основной код, что и в ответе Гари, так как я думаю, что немного проще увидеть проблему и отладить.
РЕДАКТИРОВАТЬ ПОСЛЕ ОТВЕТА ГАРИ Я принял ответ Гари, когда он решаетМоя первоначальная проблема (обойти текущую проблему фреймворка, когда в конфигурации есть связующие среды).Однако сообщения DLQ оказались довольно бесполезными для моего случая.Я закончил тем, что подписался на "errorChannel", как только я получил это на ответ Гэри, и оттуда создал специальное сообщение об ошибке, которое я отправил на обычный связанный канал SCS.