Как создать несколько параллельных очередей опроса SqsMessageDrivenChannelAdapter? - PullRequest
0 голосов
/ 17 апреля 2019

У меня действительно проблемы с пропускной способностью с spring-integration-aws s SqsMessageDrivenChannelAdapter , что является абстракцией Spring Integration вокруг spring-cloud-aws 's SimpleMessageListenerContainer.

Кажется, проблема в том, что SimpleMessageListenerContainer может запрашивать только 10 сообщений одновременно (ограничение AWS) и делает эти запросы невероятно медленно - в частности, я наблюдаю ~ 15 т / с, что слишком медленно.

Вот мой код:

import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class SqsConfiguration {
    // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestParameters
    private static final int SQS_RECEIVE_MAX_NUM_MESSAGES = 10;

    @Autowired
    private AmazonSQSAsync amazonSQSAsync;

    @Bean
    ThreadPoolTaskExecutor sqsThreadPoolTaskExecutor() {
        final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        return threadPoolTaskExecutor;
    }

    @Bean
    MessageChannel receivedSqsMessageChannel() {
        return new ExecutorChannel(sqsThreadPoolTaskExecutor());
    }

    @Bean
    SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter() {
        final SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter = new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "...");
        sqsMessageDrivenChannelAdapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
        sqsMessageDrivenChannelAdapter.setOutputChannel(receivedSqsMessageChannel());
        sqsMessageDrivenChannelAdapter.setMaxNumberOfMessages(SQS_RECEIVE_MAX_NUM_MESSAGES);
        return sqsMessageDrivenChannelAdapter;
    }
}

receivedSqsMessageChannel - это входной поток, который в конечном итоге заканчивается сообщением Acknowledgement#acknowledge().

Учитывая, что пока нет никакой функции SimpleMessageListenerContainer (и, следовательно, SqsMessageDrivenChannelAdapter) для опроса в нескольких потоках (есть обсуждения для spring-cloud-aws, но она далеко не близка к реализации, и после этого, для spring-integration-aws для его поддержки), и TPS, который у меня есть сейчас, является недопустимым, я считаю, что лучшим "решением" сейчас является одновременный опрос нескольких SqsMessageDrivenChannelAdapter с выводом на receivedSqsMessageChannel.

Как мне этого добиться весной? Есть ли способ, чтобы пул SqsMessageDrivenChannelAdapter работал одновременно с использованием аннотаций Spring?

...