Как получить тестовый файл для вывода потокового файла в обработчике исполняемого скрипта nifi - PullRequest
0 голосов
/ 27 сентября 2019

Я пытаюсь написать текстовый файл, содержащий словарь для обратного вызова outputStream в процессоре Nifi ExcuteScript.Вот мой код, но он не работает должным образом -

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import os
import java.io.FileOutputStream

ofile=open("/home/nifi/data/outfile.txt", 'a')
with open("/home/nifi/data/validation.json",'r') as vfile:
    validation = json.loads(vfile.read())
finaldict={}
dict1={}

class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    invalidrecord = json.loads(text)

    for record in invalidrecord:
        #print(finaldict)
        finaldict.update(record)
        # print(finaldict)
        finaldict['errors'] = []
        errordict = {}
        # loop through each field from validation json
        for field in validation:
            for element, value in record.items():
                if field == element:
                    dtype = type(value)
                    dict1 = validation[field]
                    for validtype in dict1:
                        if validtype['validationType'] == 'datatype':
                            if str(dtype) == validtype['check']:
                                pass
                            else:
                                errordict = {"element": field}
                                errordict.update({"errorCode": validtype['errorCode']})
                                finaldict["errors"].append(errordict)
                        if validtype['validationType'] == 'maxlength':
                            dlen = len(str(value))
                            if str(dlen) <= str(validtype['check']):
                                pass
                            else:
                                errordict = {"element": field}
                                errordict.update({"errorCode": validtype['errorCode']})
                                finaldict["errors"].append(errordict)
                else:
                    pass
        json.dump(finaldict, ofile)

    outputStream.write(bytearray(ofile.encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()

В приведенном выше коде outputStream.write(bytearray(ofile.encode('utf-8'))) не записывает outfile.txt в файл потока и выдает:

NullPointerException at session.transfer (flowFile, REL_SUCCESS)

Может кто-нибудь подсказать, пожалуйста, что не так с этим кодом.

Связанный вопрос находится по ссылке ниже, но не смог найти то, что яищу, так как он не отправляет текстовый файл на выходной файл потока nifi.

Доступ к элементу Json и запись в текстовый файл с использованием процессора Python ExecuteScript

...