Похоже, вам следует использовать шаблон Aggregator .
Вы можете использовать различные опции компонента, чтобы указать критерии завершения и корреляцию между сообщениями
Например, следующее требование означает, что вы, вероятно, должны учитывать имя файла на вашем correlationExpression
:
Будет только одно сообщение NEWFILE
для данного имени файла будет
несколько сообщений TASK для данного
Имя файла
Для обработки задач вы можете временно сохранять сообщения задач на
newExchange
объект, а затем, когда соответствующий , вы можете сбросить сохраненные сообщения в их целевые очереди. Я полагаю, у вас есть два основных варианта для соответствующего момента:
Дождитесь всех сообщений определенного контекста, которые будут агрегированы (сообщения NEWFILE и TASK) с использованием критериев завершения, таких как размер, время ожидания и т. Д., Которые вы можете использовать, например, completionPredicate
опция для оценки того, завершен ли контекст или нет. А после завершения отправьте задачи в следующую очередь.
Подождите, пока не придет сообщение NEWFILE, сбросьте временно сохраненные сообщения TASK в целевую очередь, а затем дождитесь оставшихся сообщений TASK (опять же, используя соответствующие критерии, чтобы указать, что контекст сообщения закончился).
Я не пробовал этого в коде, но я полагаю, что хотя первый вариант может быть проще, второй вариант может повысить производительность, поскольку сообщения TASK передаются в целевые очереди как можно скорее, Первые задачи, вероятно, будут выполнены раньше, чем последние, в то время как в первом случае вам придется ждать выполнения всех задач и только после этого запускать процесс. Кроме того, поскольку вы также удаляете временно сохраненные сообщения как можно скорее, вы также получаете более низкие требования к хранилищу по сравнению с первым вариантом. Это означает, что если вы имеете дело с большими сообщениями и большими объемами, вам потребуется меньше памяти, если это не так ... тогда оба решения могут быть похожими.