Существует ли корпоративный шаблон интеграции для объединения элементов из 2 очередей? - PullRequest
2 голосов
/ 25 марта 2011

У меня есть две системы, которые вырабатывают разные типы сообщений, которые мне нужны, а затем генерируют новые сообщения на их основе.

  • Первое создает сообщения NEWFILE, содержащие имя файла, которые поступают в очередь queue: file.new .
  • Второй генерирует сообщения TASK, содержащие имя файла и тип задачи, и идут в очередь: задача .

  • Для данного имени файла будет только одно NEWFILE сообщение.

  • Для данного имени файла будет несколько сообщений TASK, каждое из которых имеет свой тип задачи.
  • Сообщения могут поступать в любом порядке.

Мне нужно переслать каждое сообщение TASK, которое достигает очереди: выполнить , но только , когда поступило сообщение NEWFILE для соответствующего имени файла.

В настоящее время мы выполняем это сопоставление в другой (не основанной на очереди) системе, которая использует опрос для нахождения ФАЙЛОВ и ЗАДАЧ и поддерживает две таблицы: одну для ФАЙЛОВ и одну для ЗАДАЧ, и при обнаружении любого из них мы проверяем совпадения Есть и огонь соответствующей задачи.

Мы надеемся заменить это решением для обмена сообщениями, в котором мы поддерживаем как можно меньше состояний и максимально используем шаблоны предприятия (Apache Camel).

Мне не ясно, какой шаблон / компоненты будут соответствовать моим требованиям.

Спасибо, Том

1 Ответ

0 голосов
/ 15 июня 2011

Похоже, вам следует использовать шаблон Aggregator . Вы можете использовать различные опции компонента, чтобы указать критерии завершения и корреляцию между сообщениями

Например, следующее требование означает, что вы, вероятно, должны учитывать имя файла на вашем correlationExpression:

Будет только одно сообщение NEWFILE для данного имени файла будет несколько сообщений TASK для данного Имя файла

Для обработки задач вы можете временно сохранять сообщения задач на newExchange объект, а затем, когда соответствующий , вы можете сбросить сохраненные сообщения в их целевые очереди. Я полагаю, у вас есть два основных варианта для соответствующего момента:

  1. Дождитесь всех сообщений определенного контекста, которые будут агрегированы (сообщения NEWFILE и TASK) с использованием критериев завершения, таких как размер, время ожидания и т. Д., Которые вы можете использовать, например, completionPredicate опция для оценки того, завершен ли контекст или нет. А после завершения отправьте задачи в следующую очередь.

  2. Подождите, пока не придет сообщение NEWFILE, сбросьте временно сохраненные сообщения TASK в целевую очередь, а затем дождитесь оставшихся сообщений TASK (опять же, используя соответствующие критерии, чтобы указать, что контекст сообщения закончился).

Я не пробовал этого в коде, но я полагаю, что хотя первый вариант может быть проще, второй вариант может повысить производительность, поскольку сообщения TASK передаются в целевые очереди как можно скорее, Первые задачи, вероятно, будут выполнены раньше, чем последние, в то время как в первом случае вам придется ждать выполнения всех задач и только после этого запускать процесс. Кроме того, поскольку вы также удаляете временно сохраненные сообщения как можно скорее, вы также получаете более низкие требования к хранилищу по сравнению с первым вариантом. Это означает, что если вы имеете дело с большими сообщениями и большими объемами, вам потребуется меньше памяти, если это не так ... тогда оба решения могут быть похожими.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...