Я использую сообщения из очереди SQS FIFO с установленным maxMessagesPerPoll=5
.
В настоящее время я обрабатываю каждое сообщение индивидуально, что является пустой тратой ресурсов.
В моем случае, поскольку мы используем очередь FIFO и все эти 5 сообщений связаны с одним и тем же объектом, я мог бы обработать их все вместе.
Я думал, что это можно сделать с помощью шаблона aggregate
, но я не смог получить никаких результатов.
Мой потребительский маршрут выглядит так:
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.process(exchange -> {
// process the message
})
Я считаю, что можно сделать что-то подобное
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.aggregate(const(true), new GroupedExchangeAggregationStrategy())
.completionFromBatchConsumer()
.process(exchange -> {
// process ALL messages together as I now have a list of all exchanges
})
но processor
никогда не вызывается.
Вторая вещь:
Если я смогу сделать это, когда ACK отправляется в SQS? Когда обрабатывается каждое отдельное сообщение или когда завершается совокупный процесс? Я надеюсь, что последний