@SQSListener не использует настройки параллелизма - PullRequest
0 голосов
/ 02 октября 2018

Я создаю приложение Spring Boot, которое использует очередь AWS SQS.Я могу использовать очередь, но не могу сделать это с несколькими одновременными потребителями.

При запуске из командной строки mvn spring-boot:run он подключается к SQS и будет получать сообщения одинвовремя.У метода слушателя есть дополнительный код, который вызывает задержку в несколько секунд, в течение которой я ожидал бы, что второе сообщение будет получено и обработано.

Как будто конфигурация не используется.

pom.xml:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
    <relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.SR1</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-aws</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-aws-messaging</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.11.419</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>2.0.0.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

основной класс:

@SpringBootApplication
public class IngestConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(IngestConsumerApplication.class, args);
    }
}

Класс конфигурации:

@Configuration
public class AppConfig {

@Value("${aws.region}")
private String region;

@Bean
@Primary
public AWSCredentialsProvider buildAWSCredentialsProvider() {
    return new ProfileCredentialsProvider();
}

@Bean
@Primary
public AmazonS3 buildS3Client(@Autowired AWSCredentialsProvider credentials) {
    return AmazonS3ClientBuilder
            .standard()
            .withCredentials(credentials)
            .withRegion(region)
            .build();
}

@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync(@Autowired AWSCredentialsProvider credentials) {
    return AmazonSQSAsyncClientBuilder.standard()
            .withCredentials(credentials)
            .withRegion(region)
            .build();
}

@Bean
public QueueMessageHandler queueMessageHandler(@Autowired AmazonSQSAsync sqsClient, @Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
    queueMessageHandlerFactory.setAmazonSqs(sqsClient);
    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    return queueMessageHandler;
}

@Bean
public ObjectMapper buildJacksonObjectMapper() {
    return new ObjectMapper();
}

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, 
        @Autowired QueueMessageHandler queueMessageHandler,
        @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    simpleMessageListenerContainer.setMaxNumberOfMessages(2);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolExecutor);

    return simpleMessageListenerContainer;
}

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(2);
    executor.setThreadNamePrefix("queueExec");
    executor.initialize();
    return executor;
}

}

Наконец, потребитель:

@Async
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS, value = "${queue}")
public void listener(S3EventNotification event) {
    ...
}
...