Совокупные результаты потребителя партии в Camel (например, из SQS) - PullRequest
0 голосов
/ 02 мая 2018

Я использую сообщения из очереди SQS FIFO с установленным maxMessagesPerPoll=5.

В настоящее время я обрабатываю каждое сообщение индивидуально, что является пустой тратой ресурсов. В моем случае, поскольку мы используем очередь FIFO и все эти 5 сообщений связаны с одним и тем же объектом, я мог бы обработать их все вместе.

Я думал, что это можно сделать с помощью шаблона aggregate, но я не смог получить никаких результатов.

Мой потребительский маршрут выглядит так:

from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .process(exchange -> {
        // process the message
    })

Я считаю, что можно сделать что-то подобное

from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
    .aggregate(const(true), new GroupedExchangeAggregationStrategy())
    .completionFromBatchConsumer()
    .process(exchange -> {
        // process ALL messages together as I now have a list of all exchanges
    })

но processor никогда не вызывается.

Вторая вещь: Если я смогу сделать это, когда ACK отправляется в SQS? Когда обрабатывается каждое отдельное сообщение или когда завершается совокупный процесс? Я надеюсь, что последний

Ответы [ 2 ]

0 голосов
/ 04 мая 2018

Проблема заключается в GroupedExchangeAggregationStrategy

Когда я использую эту стратегию, на выходе получается «массив» всех бирж. Это означает, что обмен, который приходит к предикату завершения, больше не имеет начальных свойств. Вместо этого он имеет CamelGroupedExchange и CamelAggregatedSize, которые не используются для completionFromBatchConsumer()

Поскольку мне не нужно объединять все биржи, достаточно использовать GroupedBodyAggregationStrategy. Тогда обменные свойства останутся такими же, как и в исходном обмене, и только тело будет содержать «массив»

Другим решением будет использование completionSize(Predicate predicate) и использование пользовательского предиката, который извлекает необходимое значение из групповых обменов.

0 голосов
/ 03 мая 2018

Когда процессор не вызывается, агрегатор, вероятно, все еще ожидает агрегирования новых сообщений .

Вы можете попробовать использовать completionSize(5) вместо completionFromBatchConsumer() для теста. Если это работает, проблема определения пакета завершена.

Для ACK против брокера: к сожалению, нет . Я думаю, что сообщение передается, когда оно поступает в агрегатор.

Компонент-агрегатор Camel является компонентом с отслеживанием состояния, поэтому он должен завершить текущую транзакцию .

По этой причине вы можете оборудовать такие компоненты постоянными репозиториями, чтобы избежать потери данных при остановке процесса. В таком случае уже агрегированные сообщения, очевидно, будут потеряны, если у вас нет постоянного хранилища.

...