Как читать файлы в процессоре ExecuteStreamCommand в NiFi - PullRequest
0 голосов
/ 25 апреля 2019

Моя конечная цель - замаскировать данные в одном конкретном файле. Я хочу переместить файлы из одного места в другое. Во время этого процесса передачи я должен замаскировать данные, используя скрипт Python. Итак, я разработал ниже потока:

GetFile > ExecuteStreamCommmand > PutFile

Я разработал один Python скрипт, используя pandas. Я запускаю этот NiFi на виртуальной машине, созданной на Google Cloud Platform, где я установил Python-2.7 и NiFi-1.9.1. Ниже мой код Панды:

import pandas as pd
readFile = pd.read_csv("/path",sep=" ",header=None)
readFile.columns = ['IP']
readFile['IP'] = readFile['IP'].replace(regex='((?<=[0-9])[0-9]|(?<=\.)[0-9])',value='X')
readFile.to_csv("/path", sep=' ')

У меня ниже сомнения:
1) Используя процессор getFile, я передаю файл в очереди следующему процессору, т.е. процессору ExecuteStreamCommand.
2) Кроме того, в своем коде Python я пытаюсь прочитать данные из того же входного каталога, который был передан в процессор GetFile, но теперь файл был перемещен в очередь между getfile> executetestreamcommand. Так как это будет читать?
3) После того, как скрипт python будет выполнен, как я могу использовать процессор putFile, чтобы сохранить его в другом месте?

Я новичок в NiFi, поэтому пытаюсь понять основные вещи. Также я приложил скриншот потока и ошибки. enter image description here

1 Ответ

1 голос
/ 25 апреля 2019

ExecuteStreamCommand

Содержимое файла потока передается в команду (в вашем случае это python) как stdin stream

Итак, вы должны использовать следующий код:

readFile = pd.read_json(sys.stdin)

с другой стороны, если вам нужно применить regexp replace к файлу потока, вы можете попробовать использовать ReplaceText процессор вместо ExecuteStreamCommand

...