Использование sys.stdout.write () для создания нескольких файлов в NiFi? - PullRequest
0 голосов
/ 24 февраля 2020

У меня есть конвейер в Нифи, который сбрасывает некоторые недействительные JSON, которые мне нужно очистить. Лучшее решение, которое я придумал, - это запустить скрипт Python через ExecuteStreamCommand и одновременно очистить / разделить его одним движением oop. Однако, хотя я использую sys.stdout.write() в моем для l oop, только исходный JSON выходит в выходной поток в NiFi.

Я неправильно использую sys.stdout.write() или это возможно, но я только что сделал что-то не так? Моя конечная цель - чтобы каждая строка json была новым файлом потока, т.е. файл 1 - {"fruit":"apple",..., файл 2 - {"fruit":"cherry",... и т. Д.

пример JSON

{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo", }, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais", }, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london", }, "color":"yellow"}

скрипт

import json
import re
import sys

flow_file = sys.stdin.read()
try:
    load = json.loads(flow_file)
    sys.stdout.write(flow_file)
except:
    flow_file_esc = re.sub(r"[(\\)]", "", flow_file)
    for f in flow_file_esc.splitlines():
        sys.stdout.write(str(f))

1 Ответ

0 голосов
/ 24 февраля 2020

Можно ли сначала очистить файл с помощью ReplaceText, а затем разделить его с помощью Split Json, SplitRecord или ForkRecord?

Если вам нужно объединить две операции и вы хотите написать их сценарий, вы можете попробовать выполнить ExecuteScript с Jython (поскольку это не похоже на использование собственных CPython библиотек), у меня есть несколько простых примеров в моей кулинарной книге и в моем блоге .

...