Выполнение сообщений в той же последовательности после получения сообщений из очереди sqs fifo - PullRequest
0 голосов
/ 25 апреля 2018

Я определяю прослушиватель сообщений, аналогичный тому, который определен ниже.Сообщения представляют собой операторы запросов.Сообщения помещаются в очередь последовательно, чтобы не нарушать правила внешнего ключа при отправке в таблицы.Когда я устанавливаю maxnumberofmessages равным 10, я вижу, что запросы выполняются не по порядку.Однако, когда я установил его на 1, я не вижу никаких проблем.Как гарантировать, что сообщения будут прочитаны в той же последовательности, что и в очереди, когда я установил для maxnumberofmessages более высокое значение, чем 1?

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(10);
    return simpleMessageListenerContainer;
}


@SqsListener(value = "${sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
    repository.execute(serviceData);

}

Модифицированный код

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {
    SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();
    msgListenerContainerFactory.setAmazonSqs(amazonSQSAsyncClient());
    msgListenerContainerFactory.setAutoStartup(false);
    msgListenerContainerFactory.setMaxNumberOfMessages(10);
    msgListenerContainerFactory.setTaskExecutor(threadPoolTaskExecutor());
    return msgListenerContainerFactory;
}


@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(1);
    executor.setMaxPoolSize(1);
    executor.setThreadNamePrefix("queueExecutor");
    executor.initialize();
    return executor;
}

@SqsListener(value = "${sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
    repository.execute(serviceData);

}

1 Ответ

0 голосов
/ 25 апреля 2018

Это потому, что логика там:

   ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
   CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
   for (Message message : receiveMessageResult.getMessages()) {
        if (isQueueRunning()) {
             MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
             getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
        } else {
            messageBatchLatch.countDown();
        }
  }

Обратите внимание на getTaskExecutor().execute(). Вот как ваши сообщения переносятся в собственный поток для исполнения.

Вы можете переконфигурировать его в ThreadPoolTaskExecutor с размером пула 1. И все ваши сообщения будут обрабатываться в одной ветке, поэтому заказ будет доставлен.

...