Агрегатор сбора, используемый в инфраструктуре Mule 2.0, работает примерно так:
Входящий маршрутизатор принимает коллекцию сообщений и разбивает ее на несколько меньших сообщений - каждое меньшее сообщение помечается идентификатором корреляции, соответствующим родительскому сообщению
Эти сообщения проходят через различные службы
Наконец, эти сообщения поступают во входящий агрегатор, который собирает сообщения на основе идентификатора корреляции родительского сообщения и числа ожидаемых сообщений. После получения всех ожидаемых сообщений вызывается функция агрегирования и возвращается результат.
Теперь это работает нормально, когда количество сообщений в группе достаточно мало. Однако, как только количество сообщений в группе становится огромным ~ 100 тыс., Тогда большая часть памяти связывается с группой сообщений, ожидающих прибытия более поздних сообщений. Это усугубляется, если одновременно объединяются несколько групп.
Обойти эту проблему можно было бы с помощью потокового агрегатора. В моем случае использования я, по сути, суммирую различные сообщения на основе ключа, и это можно сделать без необходимости одновременного просмотра всех сообщений в группе. Я только хотел бы знать, что все сообщения были получены до пересылки результата на конечную точку.
Похоже ли это на разумное решение проблемы?
Это уже реализовано где-то в Муле?
Есть ли лучшие способы сделать это?