Я хочу иметь возможность обрабатывать более 10 сообщений SQS одновременно, используя рабочий процесс Spring Integration.
Из этого вопроса рекомендуется использовать ExecutorChannel . Я обновил свой код, но у меня остались те же симптомы.
Как выполнить поток интеграции Spring в нескольких потоках, чтобы параллельно использовать больше сообщений очереди Amazon SQS?
После этого обновления мое приложение запрашивает 10 сообщений, обрабатывает их и только после того, как я сделаю вызов amazonSQSClient.deleteMessage ближе к концу потока, оно примет еще 10 сообщений из очереди SQS. .
Приложение использует очередь SQS FiFo.
Есть что-то еще, что я упускаю, или это неизбежный признак использования SqsMessageDeletionPolicy.NEVER и последующего удаления сообщений в конце потока? Принятие сообщений в начале потока на самом деле не вариант из-за других ограничений.
Вот соответствующие фрагменты кода с некоторыми упрощениями, но я надеюсь, что это выражает проблему.
Настройка очереди
@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(50);
return executor;
}
@Bean
@Qualifier("inputChannel")
public ExecutorChannel inputChannel() {
return new ExecutorChannel(inputChannelTaskExecutor());
}
Я также попробовал ThreadPoolTaskExecutor вместо SimpleAsyncTaskExecutor с тем же результатом, но я также включу его, на случай, если он предложит другое понимание.
@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(50);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("spring-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.afterPropertiesSet();
executor.initialize();
return executor;
}
Канальный адаптер SQS
@Bean
public SqsMessageDrivenChannelAdapter changeQueueMessageAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSQSClient, changeQueue);
adapter.setOutputChannel(inputChannel);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
return adapter;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500, TimeUnit.MILLISECONDS).maxMessagesPerPoll(10);
}
Упрощенный основной поток
Обычный сценарий для нас - это получить несколько правок Ветви за короткий промежуток времени. Этот поток лишь «заботится» о том, чтобы произошло хотя бы одно редактирование. messageTransformer извлекает идентификатор из документа полезной нагрузки и помещает его в заголовок dsp_docId , который мы затем используем для агрегирования (мы используем этот идентификатор в нескольких других местах, поэтому мы почувствовали заголовок имеет смысл, а не делать всю работу в пользовательском агрегаторе).
provisioningServiceActivator извлекает последнюю версию Branch, затем маршрутизатор решает, нужно ли ему дальнейшие преобразования (в этом случае он отправляет его в transformBranchChannel ) или его можно отправить на наш экземпляр PI (через sendToPiChannel).
Поток преобразования (не показан, я не думаю, что он вам нужен) в конечном итоге приводит к отправке в поток PI, сначала он просто выполняет больше работы.
listGroupProcessor захватывает все заголовки aws_receiptHandle и добавляет их в новый заголовок как | разделенный список.
Поток sendToPi (и errorFlow) заканчивается вызовом пользовательского обработчика, который заботится об удалении всех сообщений SQS, указанных в этом списке строк aws_receiptHandle.
@Bean
IntegrationFlow sqsListener() {
return IntegrationFlows.from(inputChannel)
.transform(messageTransformer)
.aggregate(a -> a.correlationExpression("1")
.outputProcessor(listingGroupProcessor)
.autoStartup(true)
.correlationStrategy(message -> message.getHeaders().get("dsp_docId"))
.groupTimeout(messageAggregateTimeout) // currently 25s
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.get())
.handle(provisioningServiceActivator, "handleStandard")
.route(Branch.class, branch -> (branch.isSuppressed() == null || !branch.isSuppressed()),
routerSpec -> routerSpec.channelMapping(true, "transformBranchChannel")
.resolutionRequired(false)
.defaultOutputToParentFlow())
.channel(sendtoPiChannel)
.get();
}