Spring Integration + SQS - повторная попытка исключения не работает - PullRequest
1 голос
/ 04 апреля 2019

Я работаю над интеграцией Spring Integration с очередью AWS SQS.

У меня проблема, когда мой метод, помеченный @ServiceActivator, генерирует исключение. Похоже, что в таких случаях сообщение удаляется из очереди в любом случае. Я настроил MessageDeletionPolicy на ON_SUCCESS в SqsMessageDrivenChannelAdapter.

Вот моя конфигурация канала / адаптера https://github.com/sdusza1/spring-integration-sqs/blob/master/src/main/java/com/example/demo/ChannelConfig.java

Я пытался сделать то же самое, используя аннотацию @SqsListener, и сообщения не удаляются, как ожидалось.

Я создал мини-приложение Spring Boot, чтобы продемонстрировать эту проблему: https://github.com/sdusza1/spring-integration-sqs

Пожалуйста, помогите:)

1 Ответ

1 голос
/ 04 апреля 2019

Ваша конфигурация такая:

@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME);
    adapter.setOutputChannel(inboundChannel());
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
    adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER);
    return adapter;

}

Где inboundChannel такой:

 @Bean
    public QueueChannel inboundChannel() {
        return new QueueChannel();
 }

Итак, это очередь , поэтому асинхронное сообщение и сообщение из этой очереди обрабатываются в отдельном потоке TaskScheduler, который опрашивает канал такого типа в соответствии с вашей конфигурацией PollerMetadata. В этом случае любые ошибки потребителя также генерируются в этом потоке и не достигают SqsMessageDrivenChannelAdapter для ожидаемой обработки ошибок.

Технически это полностью отличается от вашего @SqsListener опыта, который действительно вызывается непосредственно в потоке контейнера, и поэтому применяется его обработка ошибок.

Или вам нужно пересмотреть свою логику, как бы вы хотели обрабатывать ошибки в этом отдельном потоке, или просто не использовать QueueChannel сразу после SqsMessageDrivenChannelAdapter и позволить ему генерировать и обрабатывать ошибки в базовом контейнере прослушивателя SQS как это в случае @SqsListener.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...