Я пытаюсь написать текстовый файл, содержащий словарь для обратного вызова outputStream в процессоре Nifi ExcuteScript.Вот мой код, но он не работает должным образом -
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import os
import java.io.FileOutputStream
ofile=open("/home/nifi/data/outfile.txt", 'a')
with open("/home/nifi/data/validation.json",'r') as vfile:
validation = json.loads(vfile.read())
finaldict={}
dict1={}
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
invalidrecord = json.loads(text)
for record in invalidrecord:
#print(finaldict)
finaldict.update(record)
# print(finaldict)
finaldict['errors'] = []
errordict = {}
# loop through each field from validation json
for field in validation:
for element, value in record.items():
if field == element:
dtype = type(value)
dict1 = validation[field]
for validtype in dict1:
if validtype['validationType'] == 'datatype':
if str(dtype) == validtype['check']:
pass
else:
errordict = {"element": field}
errordict.update({"errorCode": validtype['errorCode']})
finaldict["errors"].append(errordict)
if validtype['validationType'] == 'maxlength':
dlen = len(str(value))
if str(dlen) <= str(validtype['check']):
pass
else:
errordict = {"element": field}
errordict.update({"errorCode": validtype['errorCode']})
finaldict["errors"].append(errordict)
else:
pass
json.dump(finaldict, ofile)
outputStream.write(bytearray(ofile.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
В приведенном выше коде outputStream.write(bytearray(ofile.encode('utf-8')))
не записывает outfile.txt в файл потока и выдает:
NullPointerException at session.transfer (flowFile, REL_SUCCESS)
Может кто-нибудь подсказать, пожалуйста, что не так с этим кодом.
Связанный вопрос находится по ссылке ниже, но не смог найти то, что яищу, так как он не отправляет текстовый файл на выходной файл потока nifi.
Доступ к элементу Json и запись в текстовый файл с использованием процессора Python ExecuteScript