Как найти все файлы, созданные GenerateTableFetch был обработан - PullRequest
0 голосов
/ 14 ноября 2018

У нас есть поток, в котором GenerateTableFetch принимает входные данные из splitJson, которые в качестве аргумента указывают TableName, ColumnName. Сразу несколько таблиц передаются в качестве входных данных для GenerateTableFetch, а затем в следующем ExecuteSql выполняет запрос.

Теперь я хочу запустить новый процесс, когда все файлы для таблицы были обработаны указанным ниже процессором (в конце есть PutFile).

Как найти, что все файлы, созданные для таблицы, были обработаны?

1 Ответ

0 голосов
/ 14 ноября 2018

Вам может потребоваться NIFI-5601 , чтобы выполнить это, на момент написания этой статьи в данный момент рассматривается исправление, я надеюсь получить его в NiFi 1.9.0.

РЕДАКТИРОВАТЬ : добавление потенциальных обходных путей за это время

Если вы можете использовать ListDatabaseTables вместо того, чтобы получать имена таблиц из файла JSON, тогда вы можете установить Include Count в true. Затем вы получите атрибуты для имени таблицы и количества ее строк. Затем вы можете разделить счетчик на значение Partition Size в GTF, и это даст вам количество выборок (назовем это X). Затем добавьте атрибут через UpdateAttribute с именем «parent» или что-то в этом роде и установите для него значение ${UUID()}. Сохраните эти атрибуты в файлах потоков, идущих в GTF и ExecuteScript, затем вы можете использовать Wait / Notify для ожидания получения файлов потоков X (установка Target Signal Count в ${X}) и использования ${parent} в качестве Release Signal Identifier .

Если вы не можете использовать ListDatabaseTables, тогда вы можете иметь ExecuteSQLRecord после вашего SplitJSON, вы можете выполнить что-то вроде SELECT COUNT(*) FROM ${table.name}. Если вы используете ExecuteSQL, вам может потребоваться ConvertAvroToJSON, если вы используете ExecuteSQLRecord, используйте JSONRecordSetWriter. Затем вы можете извлечь счет из содержимого файла потока, используя EvaluateJsonPath.

Если у вас есть имя таблицы и количество строк в атрибутах, вы можете продолжить описанный выше поток (т. Е. Определить количество файлов потоков, которые создаст GTF, и т. Д.).

...