Разбираем сообщения с python в Nifi и сохраняем как json - PullRequest
0 голосов
/ 03 октября 2018

Я пытаюсь прочитать сообщения (16-значное число) от процессора Nifi (Collected_data) в другом процессоре Nifi, содержащем скрипт Python для синтаксического анализа его в json (словарь Python), и мне нужно сохранить его как json, каждое сообщение вотдельный файл JSON.

Код:

import datetime
import hashlib
import sys
from urlparse import urlparse, parse_qs
from datetime import *
from time import time
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass
    def process(iself, inputStream, outputStream):
        data = IOUtils.readLines(inputStream)
        buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
        buf = int(buf, 16)
        buf_check = str(buf)
        if buf_check[17] == 2:
            pass
        datetime_now = datetime.now()
        log_date = datetime_now.isoformat()
        try:
            mac = buf_check[16:22].upper()
            ob_id = buf_check[4:]
            action = buf_check[1:2]
            time_a = int(time())
            dict_test = {
            "user": {
                "gur" : 'false'
            },
            "device" : {
                "type" : "box",
                "mac": mac
            },
            "event" : {
                "origin" : "tv",
                "timestamp": time_a,
                "type": "zap",
                "product-type" : "radiochannel",
                "channel": {
                    "id" : 'channel_id',
                    "ob-id": ob_id
                },
                "content": {
                    "action": action
                }
            }
            }
            outputStream.write(bytearray(json.dumps(dict_test, indent=4).encode('utf-8')))
            return dict_test
        except Exception as e:
            print('%s nod PARSE 500 \"%s\"' % (log_date, e))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile,PyStreamCallback())
    flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
    session.transfer(flowFile, REL_SUCCESS)

Это отправляется в процесс PutFile в Nifi, и что каждое сообщение должно быть сохранено как файл с проанализированным json внутри.И я создаю файл 16-значный + "translation.json", но в этом файле следующий:

^A^D^@7^A^@^A ^@^@^Q<9e>^@^@    <99>^@^D0<94>À6^A 

Любая идея, подсказка или предложение приветствуется!Заранее спасибо.

...