Как передать потоковые файлы в скрипт Execute Python и использовать атрибуты и переменные Nifi для хранения этого файла? - PullRequest
1 голос
/ 10 мая 2019

Я новичок в NiFi и Python, и мне нужна ваша помощь, чтобы передать значение атрибута Flow File в скрипт. Скрипт конвертирует вложенный json в csv. Когда я запускаю скрипт локально, он работает.

Как передать имя FlowFile в src_json и tgt_csv?

Спасибо

Роза

import pandas as pd
import json
from pandas.io.json import json_normalize

src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"

jfile = open(src_json)
jdata = json.load(jfile)

...rest of the code...
```python

1 Ответ

0 голосов
/ 10 мая 2019

У вас есть несколько вариантов для выполнения этой задачи.

  1. Как указал Arun211 , существует процессор ConvertRecord, который в значительной степени выполняет эту задачу. Если ваш вложенный JSON является проблемой или у вас есть другие причины для того, чтобы сделать это в скрипте Python, продолжайте ниже.
  2. Если у вас есть скрипт Python, который выполняет эту задачу, как показано выше, вам нужно будет вызвать его из NiFi, предоставляя данные для скрипта. Ты можешь использовать:
    1. ExecuteScript (лучше для создания прототипов) и InvokeScriptedProcessor (более производительно для производственных задач) позволяют запускать скрипты Python (фактически Jython ) внутри экземпляра NiFi. Это дает вам прямой доступ к некоторым удобным методам и функциональности. Однако, поскольку Jython не может обрабатывать нативно скомпилированные библиотеки Python, вы не сможете использовать pandas в этом коде. См. Здесь для инструкций по настройке этого процессора и здесь, почему pandas не будет работать .
    2. Если вам требуется pandas для некоторых функций, вам нужно сохранить скрипт в виде файла Python в локальной файловой системе и вызвать его как команду оболочки, используя ExecuteStreamCommand (если вам необходимо ввести данные для этого процессора) или ExecuteProcess (если это первый процессор в вашем потоке). Эти процессоры по существу запускают команду оболочки, такую ​​как python my_python_script_with_pandas.py -someargExecuteProcess) или python my_python_script_with_pandas.py с содержимым потокового файла, равным STDINExecuteStreamCommand), и выходные данные STDOUT записываются как полученное содержимое потокового файла.

В настоящее время ваш сценарий ищет входящий файл JSON в статическом расположении файла и помещает полученный CSV в другое местоположение статического файла. Вам нужно будет изменить скрипт, чтобы выполнить одно из следующих действий:

  1. Считайте эти пути из аргументов командной строки и передайте их в соответствующем свойстве процессора выбранного процессора. Эти свойства могут быть заполнены из атрибутов потокового файла , так что вы можете сделать что-то вроде Командные аргументы : -inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file} или любую их комбинацию. Затем ваш скрипт будет читать аргументы -inputfile и -outputfile, чтобы определить пути.
  2. Считайте входящие данные напрямую из STDIN примера здесь . Затем обработайте данные JSON, преобразуйте их в CSV и верните через STDOUT. NiFi будет использовать эти данные, помещать их как содержимое результирующего потокового файла и отправлять на следующий процессор (ы) в вашем потоке.
  3. Два предыдущих варианта сохраняют ваш Python-скрипт независимым от NiFi; он не знает ни о каких конструкциях «потока». Эта опция сделает его специфичным для NiFi, но предоставит дополнительную функциональность (см. Опцию 2.1 выше). Чтобы написать код Python, который читает и записывает непосредственно из / в содержимое потокового файла, посмотрите этот пример ExecuteScript процессора, обрабатывающего содержимое потокового файла в Python .
...