У меня есть конвейер в Нифи, который сбрасывает некоторые недействительные JSON, которые мне нужно очистить. Лучшее решение, которое я придумал, - это запустить скрипт Python через ExecuteStreamCommand
и одновременно очистить / разделить его одним движением oop. Однако, хотя я использую sys.stdout.write()
в моем для l oop, только исходный JSON выходит в выходной поток в NiFi.
Я неправильно использую sys.stdout.write()
или это возможно, но я только что сделал что-то не так? Моя конечная цель - чтобы каждая строка json была новым файлом потока, т.е. файл 1 - {"fruit":"apple",...
, файл 2 - {"fruit":"cherry",...
и т. Д.
пример JSON
{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo", }, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais", }, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london", }, "color":"yellow"}
скрипт
import json
import re
import sys
flow_file = sys.stdin.read()
try:
load = json.loads(flow_file)
sys.stdout.write(flow_file)
except:
flow_file_esc = re.sub(r"[(\\)]", "", flow_file)
for f in flow_file_esc.splitlines():
sys.stdout.write(str(f))