Вам может потребоваться NIFI-5601 , чтобы выполнить это, на момент написания этой статьи в данный момент рассматривается исправление, я надеюсь получить его в NiFi 1.9.0.
РЕДАКТИРОВАТЬ : добавление потенциальных обходных путей за это время
Если вы можете использовать ListDatabaseTables вместо того, чтобы получать имена таблиц из файла JSON, тогда вы можете установить Include Count
в true
. Затем вы получите атрибуты для имени таблицы и количества ее строк. Затем вы можете разделить счетчик на значение Partition Size
в GTF, и это даст вам количество выборок (назовем это X
). Затем добавьте атрибут через UpdateAttribute с именем «parent» или что-то в этом роде и установите для него значение ${UUID()}
. Сохраните эти атрибуты в файлах потоков, идущих в GTF и ExecuteScript, затем вы можете использовать Wait / Notify для ожидания получения файлов потоков X
(установка Target Signal Count
в ${X}
) и использования ${parent}
в качестве Release Signal Identifier
.
Если вы не можете использовать ListDatabaseTables, тогда вы можете иметь ExecuteSQLRecord после вашего SplitJSON, вы можете выполнить что-то вроде SELECT COUNT(*) FROM ${table.name}
. Если вы используете ExecuteSQL, вам может потребоваться ConvertAvroToJSON, если вы используете ExecuteSQLRecord, используйте JSONRecordSetWriter. Затем вы можете извлечь счет из содержимого файла потока, используя EvaluateJsonPath.
Если у вас есть имя таблицы и количество строк в атрибутах, вы можете продолжить описанный выше поток (т. Е. Определить количество файлов потоков, которые создаст GTF, и т. Д.).