Выполнение кода Python в Apache Nifi ExecuteStreamCommand - PullRequest
0 голосов
/ 19 февраля 2019

Я пытаюсь запустить код Python в процессоре Nifi ExecuteStreamCommand.

Код включает в себя не чистые модули Python, такие как Pandas и Numpy, поэтому использовать Nifi executeScript не вариант.

проблема заключается в чтении потокового файла и изменении содержимого потокового файла.

Очевидно, что можно прочитать входящий потоковый файл с помощью STDIN и записать с помощью STDOUT, см. этот вопрос SO: Python Script с использованием ExecuteStreamCommand

Но я не смог заставить это работать.

1.Пробовал просто читать в CSV из STDIN и модифицировать его, но при отправке на процессор putFile файл такой же.

import sys
import pandas as pd
import io

df = pd.read_csv(io.StringIO(sys.stdin.read(1)))
df2 = pd.DataFrame([[5, 6], [7, 8]], columns=list('AB'))
df2 = df.append(df2)

2.Попытка обернуть какой-то другой код в функцию и вернуть в предположении, что вывод функции будет идти в STDOUT, но тот же результат.

def convert_csv_dataframe():
    a = pd.read_csv(io.StringIO(sys.stdin.read(1)))
    a.replace(["ABC", "AB"], "A", inplace=True)
    return a

convert_csv_dataframe()

Если кто-то может помочь, это будет очень цениться.:

Этот код работает.Вопрос был в Нифи.Я читал из «оригинального» отношения вместо отношения «выходной поток».Обратите внимание, что stdin читает одну строку, но не думаю, что это должно иметь значение.Единственный вопрос, который у меня возникает: могу ли я ссылаться на сам файл потока (не на его содержимое) из executeStreamCommand?

import sys

a = sys.stdin.readline()
a = a.upper()
sys.stdout.write(a)

1 Ответ

0 голосов
/ 19 февраля 2019

Я думаю, вам нужно написать в STDOUT где-нибудь в вашем скрипте.Я немного знаю Python, но оба примера выглядят так, как будто вы читаете из STDIN, а затем изменяете данные в памяти, но никогда не записываете их обратно.

...