Как реализовать эквивалент Агрегатора EIP в Nifi - PullRequest
0 голосов
/ 24 августа 2018

У меня большой опыт работы с Apache Camel и EIP, и я пытаюсь понять, как реализовать эквиваленты в Nifi. Я понимаю, что Нифи использует другую парадигму (потоковое программирование), но я не думаю, что то, что я пытаюсь сделать, является необоснованным.

В двух словах, я хочу, чтобы содержимое каждого файла отправлялось во многие остальные службы, и я хочу объединить ответы в один документ, который будет храниться вasticsearch. Я мог бы также выполнить дополнительную обработку и очистку, чтобы улучшить то, что хранится (но это не моя непосредственная проблема)

Скриншот - это быстрый макет того, чего я пытаюсь достичь, но я недостаточно разбираюсь в Nifi, чтобы знать, как правильно реализовать этот шаблон.

Nfii Screenshot

1 Ответ

0 голосов
/ 24 августа 2018

Если вы собираетесь взять один фрагмент данных, а затем выполнить переход к нескольким частям потока и затем сходиться обратно, MergeContent должен быть способ узнать, какие части объединяются.

Обычно это можно сделать двумя способами ...

Первый использует MergeContent в «режиме дефрагментации». Думайте об этом как об отмене операции разделения, которая была выполнена одним из процессоров разделения, таких как SplitText. Например, вы разбиваете файл из 100 строк на 100 потоковых файлов по 1 строке каждый, затем делаете что-то для каждого из них, а затем хотите вернуться обратно. Процессоры разделения производят стандартный набор атрибутов разделения (описанных в документации по процессорам), и режим дефрагментации знает, как соответствующим образом объединить разделения и объединить их вместе. Это, вероятно, не относится к вашему примеру, так как вы не начали с разделенного процессора.

Второй подход - это «Атрибут корреляции» в MergeConent. Это говорит слиянию содержимого только для объединения файлов потока, имеющих одинаковое значение для указанного атрибута. В вашем примере, когда файл выбирается GetFile и отправляется 3 процессорам InvokeHttp, создаются 3 файла потока, и для каждого из них должен быть установлен атрибут «filename» с именем файла, взятого с диска. Поэтому указание MergeContent сопоставлять имя файла должно помочь, и, вероятно, установить минимальное и максимальное количество записей равным ожидаемому вами значению, например 3, и максимальное время в случае сбоя или зависания одного из них.

...