Я пытаюсь запустить код Python в процессоре Nifi ExecuteStreamCommand.
Код включает в себя не чистые модули Python, такие как Pandas и Numpy, поэтому использовать Nifi executeScript не вариант.
проблема заключается в чтении потокового файла и изменении содержимого потокового файла.
Очевидно, что можно прочитать входящий потоковый файл с помощью STDIN и записать с помощью STDOUT, см. этот вопрос SO: Python Script с использованием ExecuteStreamCommand
Но я не смог заставить это работать.
1.Пробовал просто читать в CSV из STDIN и модифицировать его, но при отправке на процессор putFile файл такой же.
import sys
import pandas as pd
import io
df = pd.read_csv(io.StringIO(sys.stdin.read(1)))
df2 = pd.DataFrame([[5, 6], [7, 8]], columns=list('AB'))
df2 = df.append(df2)
2.Попытка обернуть какой-то другой код в функцию и вернуть в предположении, что вывод функции будет идти в STDOUT, но тот же результат.
def convert_csv_dataframe():
a = pd.read_csv(io.StringIO(sys.stdin.read(1)))
a.replace(["ABC", "AB"], "A", inplace=True)
return a
convert_csv_dataframe()
Если кто-то может помочь, это будет очень цениться.:
Этот код работает.Вопрос был в Нифи.Я читал из «оригинального» отношения вместо отношения «выходной поток».Обратите внимание, что stdin читает одну строку, но не думаю, что это должно иметь значение.Единственный вопрос, который у меня возникает: могу ли я ссылаться на сам файл потока (не на его содержимое) из executeStreamCommand?
import sys
a = sys.stdin.readline()
a = a.upper()
sys.stdout.write(a)