SQSListener с ThreadpoolExecutor - PullRequest
       12

SQSListener с ThreadpoolExecutor

0 голосов
/ 28 апреля 2018

В приведенном ниже примере я устанавливаю максимальный и основной размер пула равным 1. Однако сообщения не обрабатываются. Когда я включаю журнал отладки, я могу видеть сообщения, извлекаемые из SQS, но я предполагаю, что они не обрабатываются / удаляются. Однако когда я увеличиваю ядро ​​и максимальный размер пула до 2, сообщения, похоже, обрабатываются.

EDIT

Я считаю, что Spring может выделить поток для получателя, который считывает данные из очереди, и, следовательно, он не может выделить поток для слушателя, который обрабатывает сообщение. Когда я увеличил размер corepoolsize до 2, я увидел, что сообщения читаются из очереди. Когда я добавил другого слушателя (для очереди недоставленных сообщений), я столкнулся с той же проблемой - двух потоков было недостаточно, поскольку сообщения не обрабатывались. Когда я увеличил размер corepoolsize до 3, он начал обрабатывать сообщения. Я предполагаю, что в этом случае 1 поток был выделен для чтения сообщений из очереди, и 2 слушателям был назначен 1 поток каждый.

@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

Конфигурация слушателя

    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}

1 Ответ

0 голосов
/ 30 апреля 2018

Установив corePoolSize и maximumPoolSize одинаково, вы создаете fixed-size thread pool. Очень хорошее объяснение правил задокументировано здесь

Установка maxPoolSize неявно позволяет отбрасывать задачи. Однако емкость очереди по умолчанию составляет Integer.MAX_VALUE, что для практических целей является бесконечностью.

Следует обратить внимание на то, что ThreadPoolTaskExecutor использует ThreadPoolExecutor снизу, что имеет несколько необычный подход к организации очередей, описанный в документах :

Если запущено corePoolSize или более потоков, Исполнитель всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.

Это означает, что maxPoolSize имеет значение только тогда, когда очередь заполнена, иначе количество потоков никогда не будет превышать corePoolSize. Например, если мы отправляем задачи , которые никогда не выполняются , в пул потоков:

  • первые corePoolSize заявки будут начинать каждый новый поток;
  • после этого все заявки отправляются в очередь;
  • если очередь конечна и ее емкость исчерпана, каждая отправка запускает новый поток, до maxPoolSize;
  • при заполнении пула и очереди новые заявки отклоняются.

Очередь - Читать документы

Любой BlockingQueue может использоваться для передачи и хранения отправленных задач. Использование этой очереди взаимодействует с размером пула:

  • Если запущено меньше потоков corePoolSize, Исполнитель всегда предпочитает добавлять новую тему, а не ставить в очередь.
  • Если работает corePoolSize или более потоков, Исполнитель всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.
  • Если запрос не может быть поставлен в очередь, создается новый поток, если это будет превышать MaximumPoolSize, в этом случае задача будет быть отклоненным.

Unbounded queues. Использование неограниченной очереди (например, LinkedBlockingQueue без предопределенной емкости) вызовет новый задачи, стоящие в очереди в случаях, когда все потоки corePoolSize заняты. Таким образом, не будет создано больше чем corePoolSize потоков. (И поэтому значение maximumPoolSize не имеет никакого эффекта.)

  1. Если число потоков меньше corePoolSize, создайте новый Поток для запуска нового задания.
  2. Если количество потоков равно (или больше) corePoolSize, поставьте задачу в очередь.
  3. Если очередь заполнена, а количество потоков меньше, чем maxPoolSize, создайте новый поток для запуска задач.
  4. Если очередь заполнена, а число потоков больше или равный maxPoolSize, отклонить задание.
...