Как уже указывалось, существуют стандартные процессоры, которые выполняют sh эту задачу, а Max Bin Age
является необязательным свойством, поэтому вам не нужно его устанавливать.
Если вы настаиваете на выполнении этого вручную с помощью сценария, вы должны написать сценарий, который получает несколько потоковых файлов из сеанса, используя def flowfiles = session.get(100)
(настройте размер пакета, чтобы убедиться, что вы получаете все связанные потоковые файлы из входящей очереди не нагружая кучу), отфильтруйте и сгруппируйте их по определенному атрибуту c, а затем объедините все содержимое с содержимым нового потокового файла. Обновите входящие потоковые файлы и включите все UUID в атрибут объединенного потокового файла. Затем перенесите все объединенные потоковые файлы в «исходные» отношения, а объединенные потоковые файлы в пользовательские «объединенные» отношения (вам нужно будет использовать InvokeScriptedProcessor
для обеспечения настраиваемых отношений). Любые потоковые файлы, которые вы не объединили, нужно будет вернуть во входящую очередь.
Это только краткое изложение. Необходимо рассмотреть дополнительные детали и крайние случаи. Я настоятельно рекомендую не создавать для этого специальный обработчик сценариев, когда уже существуют надежные, проверенные решения. Если вы все же хотите написать свой собственный, посмотрите существующий код для процессора MergeContent
, чтобы понять, как он работает. В Руководстве разработчика Apache NiFi также есть раздел , посвященный шаблонам проектирования (хотя он не охватывает многие-к-одному , как это уже делают стандартные процессоры).