Как передать параметр в скрипт Python в Nifi - PullRequest
0 голосов
/ 02 октября 2018

Может быть, это глупый вопрос, но я должен спросить.

У меня есть процессор Collect_data в Nifi, и он передает сообщения в другой процесс, который использует скрипт Python для анализа этого и создания файла JSON.Проблема в том, что я не знаю, что вводится для функции в скрипте Python.Как передать эти сообщения (16-значные числа) из процессора Collect_data в следующий процессор содержит скрипт Python.Есть ли какой-нибудь хороший, базовый пример по этому поводу?

Я уже искал несколько примеров в Интернете, но не получил их.

import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time


def parse_zap(inputStream, outputStream):
    data = inputStream
    buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
    buf = int(buf, 16)
    buf_check = str(buf)
    if buf_check[17] == 2:
        pass
    datetime_now = datetime.now()
    log_date = datetime_now.isoformat()
    try:
        mac = buf_check[7:14].upper()
        ams_id = buf_check[8:]
        action = buf_check[3:4]
        time_a = int(time())
        dict_test = {
        "user": {
            "guruq" : 'false'
        },
        "device" : {
            "type" : "siolbox",
            "mac": mac
        },
        "event" : {
            "origin" : "iptv",
            "timestamp": time_a,
            "type": "zap",
            "product-type" : "tv-channel",
            "channel": {
                "id" : 'channel_id',
                "ams-id": ams_id
            },
            "content": {
                "action": action
            }
        }
        }
        return dict_test
    except Exception as e:
        print('%s nod PARSE 500 \"%s\"' % (log_date, e))

Благодарю, что я правильно читаю, но сейчасЯ не могу создать вывод.Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 03 октября 2018

Взгляните на этот скрипт:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

Из свойства берется количество строк, которые нужно удалить из файла потока, а затем возьмите файл потока и напишите его снова без этих строк, это легкои хороший пример того, как использовать свойства и как использовать файл потока.

Исходя из обновленного кода, ваш код должен выглядеть следующим образом:

import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time


class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    data = inputStream
    buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
    buf = int(buf, 16)
    buf_check = str(buf)
    if buf_check[17] == 2:
        pass
    datetime_now = datetime.now()
    log_date = datetime_now.isoformat()
    try:
        mac = buf_check[7:14].upper()
        ams_id = buf_check[8:]
        action = buf_check[3:4]
        time_a = int(time())
        dict_test = {
        "user": {
            "guruq" : 'false'
        },
        "device" : {
            "type" : "siolbox",
            "mac": mac
        },
        "event" : {
            "origin" : "iptv",
            "timestamp": time_a,
            "type": "zap",
            "product-type" : "tv-channel",
            "channel": {
                "id" : 'channel_id',
                "ams-id": ams_id
            },
            "content": {
                "action": action
            }
        }
        }
        return dict_test
    except Exception as e:
        print('%s nod PARSE 500 \"%s\"' % (log_date, e))

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)        
0 голосов
/ 03 октября 2018

Мне кажется, я понимаю ваш вопрос, но он несколько двусмыслен в отношении вашего потока.Я отвечаю за несколько возможных сценариев.

  1. У вас есть процессор, который получает данные из источника (то есть FetchFTP) и имеет подключение к процессору ExecuteScript, который содержит скрипт Python для преобразования этих значений.В этом случае скрипт Python может работать с атрибутами и содержимым потокового файла напрямую, используя стандартный API.См. блог Мэтта Берджесса , где приведено множество примеров написания пользовательских сценариев для работы с данными.
  2. У вас есть процессор, который получает данные из источника и имеет подключение к процессору ExecuteStreamCommand, который вызывает внешний скрипт Python, используя команду, подобную python my_external_script.py arg1 arg2 ....В этом случае содержимое потокового файла передается STDIN процессором ExecuteStreamCommand, поэтому ваш сценарий должен использовать его таким образом. Этот ответ объясняет больше об использовании ExecuteStreamCommand со скриптами Python.
  3. У вас есть собственный процессор, который внутренне вызывает отдельный процесс Python.Это плохая идея, и она должна быть переделана под одну из других моделей.Это нарушает разделение проблем, теряет поддержку жизненного цикла процессора, затеняет обработку потоков и время, теряет видимость провенанса и идет вразрез с моделью разработки NiFi.

Если ваш скрипт на Python очень прост, вы можете поместить его в ScriptedRecordWriter и использовать его для одновременной обработки нескольких «записей», чтобы получить выигрыш в производительности.Это может быть расширено для вашего варианта использования, в зависимости от того, как выглядит ваш поток и входящие данные.

Обновление 2018-10-03 10: 50

Попробуйте использовать этот сценарий в теле ExecuteScript:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        result = parse_zap(text)

        outputStream.write(bytearray(result.encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile,PyStreamCallback())
    flowFile = session.putAttribute(flowFile, "parsed_zap", "true")
    session.transfer(flowFile, REL_SUCCESS)

// Your parse_zap() method here, with the signature changed to just accept a single string
...
...