org.springframework.core.task.TaskRejectedException при прослушивании очереди SQS? - PullRequest
1 голос
/ 18 марта 2019

Я создаю базовый рабочий процесс потребления сообщений от SQS с @SqsListener. Это работает нормально, но я постоянно получаю тонны подобных сообщений:

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@372b568 [Запуск, размер пула = 3, активные потоки = 3, поставленные в очередь задачи = 0, выполненные задачи = 0]] не принять задание: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9 в org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:317) ~ [spring-context-5.1.4.RELEASE.jar: 5.1.4.RELEASE] в org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer $ AsynchronousMessageListener.run (SimpleMessageListenerContainer.java:286) ~ [spring-cloud-aws-messaging-2.1.0.RELEASE.jar: 2.1.0.RELEASE] в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) [na: 1.8.0_171] в java.util.concurrent.FutureTask.run (FutureTask.java:266) [na: 1.8.0_171] в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) [na: 1.8.0_171] в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) [na: 1.8.0_171] на java.lang.Thread.run (Thread.java:748) [na: 1.8.0_171] Вызвано: java.util.concurrent.RejectedExecutionException: задача org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9 отклонено от java.util.concurrent.ThreadPoolExecutor@372b568 [Выполняется, размер пула = 3, активных потоков = 3, задач в очереди = 0, выполненных задач = 0] в java.util.concurrent.ThreadPoolExecutor $ AbortPolicy.rejectedExecution (ThreadPoolExecutor.java:2063) ~ [na: 1.8.0_171] в java.util.concurrent.ThreadPoolExecutor.reject (ThreadPoolExecutor.java:830) [na: 1.8.0_171] в java.util.concurrent.ThreadPoolExecutor.execute (ThreadPoolExecutor.java:1379) [na: 1.8.0_171] в org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:314) ~ [spring-context-5.1.4.RELEASE.jar: 5.1.4.RELEASE] ... 6 общих кадров опущена

Моя конфигурация bean:

@EnableSqs
@Configuration
public class AmazonSqsConfiguration {

    @Value("${aws.sqs.accessKey}")
    private String accessKey;

    @Value("${aws.sqs.secretKey}")
    private String secretKey;

    @Value("${aws.sqs.region}")
    private String region;

    @Value("${aws.sqs.url}")
    private String url;

    @Bean
    public AmazonSQSAsync amazonSqs() {
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
        return AmazonSQSAsyncClientBuilder.standard()
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(url, region))
                .withCredentials(credentialsProvider)
                .build();
    }

}

Мой потребитель следующий:

@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void processSubscription(String xmlNotification) {/* Message processor */}

Можно ли удалить их, переконфигурировав @Bean? В чем причина проблемы и как с ней бороться?

Я пытался найти решение с помощью естественного поиска и столкнулся с следующим ответом . Это не работает для меня, так как у меня нет JMS. Я не смог отладить, потому что я даже не знаю, что отлаживать.

1 Ответ

2 голосов
/ 20 марта 2019

Я нашел билет на spring-cloud-aws , связанный с поведением, с которым я столкнулся. Я также нахожу соответствующий StackOverflow вопрос .

Поэтому решение, которое работало для меня, было следующим:

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQS);
    factory.setMaxNumberOfMessages(10);
    factory.setAutoStartup(true);
    factory.setWaitTimeOut(20);

    return factory;
}
...