Как обработать более 10 одновременных сообщений из очереди AWS SQS FiFo с помощью Spring Integration - PullRequest
0 голосов
/ 14 сентября 2018

Я хочу иметь возможность обрабатывать более 10 сообщений SQS одновременно, используя рабочий процесс Spring Integration.

Из этого вопроса рекомендуется использовать ExecutorChannel . Я обновил свой код, но у меня остались те же симптомы.

Как выполнить поток интеграции Spring в нескольких потоках, чтобы параллельно использовать больше сообщений очереди Amazon SQS?

После этого обновления мое приложение запрашивает 10 сообщений, обрабатывает их и только после того, как я сделаю вызов amazonSQSClient.deleteMessage ближе к концу потока, оно примет еще 10 сообщений из очереди SQS. .

Приложение использует очередь SQS FiFo.

Есть что-то еще, что я упускаю, или это неизбежный признак использования SqsMessageDeletionPolicy.NEVER и последующего удаления сообщений в конце потока? Принятие сообщений в начале потока на самом деле не вариант из-за других ограничений.

Вот соответствующие фрагменты кода с некоторыми упрощениями, но я надеюсь, что это выражает проблему.

Настройка очереди

@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
    executor.setConcurrencyLimit(50);
    return executor;
}

@Bean
@Qualifier("inputChannel")
public ExecutorChannel inputChannel() {
    return new ExecutorChannel(inputChannelTaskExecutor());
}

Я также попробовал ThreadPoolTaskExecutor вместо SimpleAsyncTaskExecutor с тем же результатом, но я также включу его, на случай, если он предложит другое понимание.

    @Bean
    public AsyncTaskExecutor inputChannelTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("spring-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.afterPropertiesSet();
        executor.initialize();
        return executor;
    }

Канальный адаптер SQS

@Bean
public SqsMessageDrivenChannelAdapter changeQueueMessageAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSQSClient, changeQueue);
    adapter.setOutputChannel(inputChannel);
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
    return adapter;
}


@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500, TimeUnit.MILLISECONDS).maxMessagesPerPoll(10);
}

Упрощенный основной поток

Обычный сценарий для нас - это получить несколько правок Ветви за короткий промежуток времени. Этот поток лишь «заботится» о том, чтобы произошло хотя бы одно редактирование. messageTransformer извлекает идентификатор из документа полезной нагрузки и помещает его в заголовок dsp_docId , который мы затем используем для агрегирования (мы используем этот идентификатор в нескольких других местах, поэтому мы почувствовали заголовок имеет смысл, а не делать всю работу в пользовательском агрегаторе).

provisioningServiceActivator извлекает последнюю версию Branch, затем маршрутизатор решает, нужно ли ему дальнейшие преобразования (в этом случае он отправляет его в transformBranchChannel ) или его можно отправить на наш экземпляр PI (через sendToPiChannel).

Поток преобразования (не показан, я не думаю, что он вам нужен) в конечном итоге приводит к отправке в поток PI, сначала он просто выполняет больше работы.

listGroupProcessor захватывает все заголовки aws_receiptHandle и добавляет их в новый заголовок как | разделенный список.

Поток sendToPi (и errorFlow) заканчивается вызовом пользовательского обработчика, который заботится об удалении всех сообщений SQS, указанных в этом списке строк aws_receiptHandle.

@Bean
IntegrationFlow sqsListener() {
    return IntegrationFlows.from(inputChannel)
                           .transform(messageTransformer)
                           .aggregate(a -> a.correlationExpression("1")
                                            .outputProcessor(listingGroupProcessor)
                                            .autoStartup(true)
                                            .correlationStrategy(message -> message.getHeaders().get("dsp_docId"))
                                            .groupTimeout(messageAggregateTimeout)  // currently 25s
                                            .expireGroupsUponCompletion(true)
                                            .sendPartialResultOnExpiry(true)
                                            .get())

                           .handle(provisioningServiceActivator, "handleStandard")
                           .route(Branch.class, branch -> (branch.isSuppressed() == null || !branch.isSuppressed()),
                                  routerSpec -> routerSpec.channelMapping(true, "transformBranchChannel")
                                                          .resolutionRequired(false)
                                                          .defaultOutputToParentFlow())

                           .channel(sendtoPiChannel)
                           .get();
}

1 Ответ

0 голосов
/ 17 сентября 2018

Я думал, что опубликую это как ответ, так как это решит мою проблему и может помочь другим. В качестве ответа, скорее всего, будет замечен, а не редактирование исходного вопроса, который может быть пропущен.

Во-первых, я должен был отметить, что мы используем FiFo очередь.

Проблема была на самом деле дальше по цепочке, где мы устанавливали MessageGroupId в простое значение, которое описывает источник данных. Это означало, что у нас были очень большие группы сообщений.

Из документации ReceiveMessage вы можете видеть, что это довольно разумно останавливает запрос дополнительных сообщений из этой группы в этом сценарии, поскольку было бы невозможно гарантировать порядок, если сообщение необходимо будет вернуть обратно. очередь.

Обновление кода, отправляющего сообщение, для установки соответствующего MessageGroupId , означало, что ExecutorChannel работал должным образом.

Хотя сообщения с определенным MessageGroupId невидимы, больше сообщений, принадлежащих тому же MessageGroupId, не возвращается, пока не истечет время ожидания видимости. Вы по-прежнему можете получать сообщения с другим MessageGroupId, если он также отображается.

...