Агрегатор мулов - потоковая агрегация - PullRequest
1 голос
/ 30 июня 2009

Агрегатор сбора, используемый в инфраструктуре Mule 2.0, работает примерно так:

  • Входящий маршрутизатор принимает коллекцию сообщений и разбивает ее на несколько меньших сообщений - каждое меньшее сообщение помечается идентификатором корреляции, соответствующим родительскому сообщению

  • Эти сообщения проходят через различные службы

  • Наконец, эти сообщения поступают во входящий агрегатор, который собирает сообщения на основе идентификатора корреляции родительского сообщения и числа ожидаемых сообщений. После получения всех ожидаемых сообщений вызывается функция агрегирования и возвращается результат.

Теперь это работает нормально, когда количество сообщений в группе достаточно мало. Однако, как только количество сообщений в группе становится огромным ~ 100 тыс., Тогда большая часть памяти связывается с группой сообщений, ожидающих прибытия более поздних сообщений. Это усугубляется, если одновременно объединяются несколько групп.

Обойти эту проблему можно было бы с помощью потокового агрегатора. В моем случае использования я, по сути, суммирую различные сообщения на основе ключа, и это можно сделать без необходимости одновременного просмотра всех сообщений в группе. Я только хотел бы знать, что все сообщения были получены до пересылки результата на конечную точку.

Похоже ли это на разумное решение проблемы?

Это уже реализовано где-то в Муле?

Есть ли лучшие способы сделать это?

1 Ответ

2 голосов
/ 11 декабря 2009

Это кажется разумным подходом (я ни в коем случае не являюсь экспертом по Mule), и я прочитал всю документацию по Mule и не думаю, что есть что-то подобное (поддержка потоковой передачи ограничена несколько коннекторов и преобразователей - это довольно просто в том смысле, что он просто обходит InputStream). Только несколько вещей в потоке Mule, поэтому вам может потребоваться иметь другие модифицированные преобразователи (если вы их используете) в этом потоке. Вы бы просто реализовали агрегатор, обеспечивающий InputStream и начинающий потоковую передачу, как только он получит некоторую последовательную последовательность сообщений.

Однако одно предложение в вашем описании «... все сообщения были получены до отправки результатов в конечную точку» может вызывать беспокойство. Это по своей природе отрицательно сказывается на цели потоковой передачи, если только вы не имеете в виду, что вы (в вашем сервисном компоненте) будут отслеживать, что вы получили все, прежде чем пересылать (предположительно намного меньший) обработанный результат вперед.

...