запись плохих json записей в отдельный файл потока с использованием Groovy (InvokedScriptedProcessor) - PullRequest
0 голосов
/ 12 февраля 2020

Я хочу отделить плохие записи JSON от потокового файла, и моя работа NiFi должна продолжить обработку хороших JSON записей. Я проверил процессор "ValidateRecord". Но поскольку сама структура JSON неверна для нескольких записей (например, "CT": "UTF-8" "), NiFi передает весь потоковый файл в отношение Failure. Поскольку для анализа * уже используется сценарий Groovy 1018 * в CSV, я думаю о записи записей об ошибках в отдельный потоковый файл при синтаксическом анализе в том же сценарии Groovy. Но я пытаюсь изменить его, поскольку я новичок в Groovy. Может ли кто-нибудь помочь?

В случае любой ошибки в синтаксическом анализе, он должен записать в отношение «сбой» flowFile, иначе «успех», отношение flowFile. Что-то вроде ..

try {
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
}
catch (e)
write to error file

Sample Bad JSON:

{"transaction":{"TS":"2020-02-04 07:04:57.437002","V":"v8","PID":"p1125","RS":"370","SR":"420","CnID":"0/6","CIPG":{"CIP":"10.67.112.35","CP":"38212","SLP":"38212","SLEP":"38212","CVID":"410"},"SIPG":{"SIP":"54.93.254.234","SP":"80","InP":"8080","SVID":"420"},"TG":{"T":"158836","R":"158836","C":"0","SDL":"0","DL":"0","I:R":"2:0","UAP":"0","EDBL":"0","Ca":"0","A":"0","RQM":"0","RSM":"0","FIT":"0","CSR":"158836"},"AS":"454","OS":"460","CPr":"-1","CVB":"0","CS":"MISS","HS":"200","OF":{"Flag1":"-","Flag2":"-","Flag3":"-","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"-","Flag11":"-","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"-","Flag23":"-"},"SF":{"Flag1":"A","Flag2":"M","Flag3":"H","Flag4":"-","Flag5":"C","Flag6":"-","Flag7":"-","Flag8":"F","Flag9":"-","Flag10":"-","Flag11":"C","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"S","Flag23":"c","Flag24":"-"},"GF":{"Flag1":"r","Flag2":"-","Flag3":"s","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"k","Flag11":"-","Flag12":"S","Flag13":"C","Flag14":"-","Flag15":"-","Flag16":"M","Flag17":"T","Flag18":"A","Flag19":"H","Flag20":"S","Flag21":"H","Flag22":"-","Flag23":"S","Flag24":"-","Flag25":"-","Flag26":"j","Flag27":"-","Flag28":"-","Flag29":"-","Flag30":"L","Flag31":"-","Flag32":"-","Flag33":"-","Flag34":"-","Flag35":"-"},"SUNR":"0","SUNS":"460","ML":"-","VSL":{"VSID":"0","TC":"-","MTC":"-","NRTC":"-","ET":"-","HRES":"0","VRES":"0","FS":"0","FR":"0.0","VSD":"0","ACB":"-","ASB":"-","VPR":"-","VSST":"-"},"MT":"-","TCPSL":"54/54/54 21 1/1 0.0/0.0","CT":"text/javascript; charset="UTF-8"","MS":"44XXXXXXXXXX","MSH":"7ogzX50IJtbIy44dRxi9PbNPlwCJ5TppUpX+Dk72TdU=","SID":"v4.172.16.137.150:9EE6E276BA938B46","SuID":"-","UA":"okhttp/3.9.1","DID":"Default-Android-Web-Unknown","UAG":"Android-Web","CID":"TECH","HR":"-","CRG":"3001010006000000","CP1":"8232F401009032F40107ADE76E","CP2":"3544470718998600","AIDF":"0:0","UCB":"0/0","CLID":"-","CLCL":"0","OPTS":"-","PUAG":"Android-Web","SSLIL":"-","HRU":{"HM":"GET","HD":"pubsub.pubnub.com","HP":"/v2/subscribe/sub-c-ee729d78-6233-11e2-b80b-12313f022c90/uk8_16620/0","HQ":"requestid=57e020bb-bc6a-47bf-8329-314f4242a506&heartbeat=300&tr=12&tt=15807995127999216&pnsdk=PubNub-Java-Unified/4.19.0&uuid=a311a598-430f-4758-a75e-a045132c926d"},"URLF":{"CID":"-","CGID":"-","CR":"-","RA":"-","USM":"-","USP":"-","MUS":"-"},"TCPSt":{"WS":"0","SE":"0","WSFNS":"0","WSF":"0","EM":"0","RSTE":"0","MSS":"0"},"NS":{"OPID":"-","ODID":"-","EPID":"-","TrID":"-","VSN":"-","LSUT":"-","STTS":"-","TCPPR":"-"},"CQA":{"NL":"-","CL":"-","CLC":"-","SQ":"-","SQC":"-"}}}

Образец товара JSON:

{"transaction":{"TS":"2020-02-04 07:04:59.942141","V":"v8","PID":"p36489","RS":"578","SR":"649","CnID":"0/1","CIPG":{"CIP":"10.65.204.71","CP":"33602","SLP":"33602","SLEP":"33602","CVID":"410"},"SIPG":{"SIP":"5.62.38.137","SP":"80","InP":"80","SVID":"420"},"TG":{"T":"363","R":"363","C":"0","SDL":"0","DL":"0","I:R":"2:0","UAP":"0","EDBL":"0","Ca":"0","A":"0","RQM":"0","RSM":"0","FIT":"0","CSR":"362"},"AS":"254","OS":"254","CPr":"0","CVB":"0","CS":"MISS","HS":"200","OF":{"Flag1":"-","Flag2":"-","Flag3":"-","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"-","Flag11":"-","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"-","Flag23":"-"},"SF":{"Flag1":"A","Flag2":"M","Flag3":"H","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"F","Flag9":"-","Flag10":"-","Flag11":"C","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"F","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"G","Flag23":"G","Flag24":"-"},"GF":{"Flag1":"n","Flag2":"-","Flag3":"b","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"k","Flag11":"-","Flag12":"S","Flag13":"C","Flag14":"-","Flag15":"-","Flag16":"M","Flag17":"T","Flag18":"A","Flag19":"!","Flag20":"-","Flag21":"H","Flag22":"-","Flag23":"S","Flag24":"-","Flag25":"-","Flag26":"5","Flag27":"-","Flag28":"-","Flag29":"-","Flag30":"L","Flag31":"-","Flag32":"-","Flag33":"-","Flag34":"-","Flag35":"-"},"SUNR":"0","SUNS":"254","ML":"-","VSL":{"VSID":"0","TC":"-","MTC":"-","NRTC":"-","ET":"-","HRES":"0","VRES":"0","FS":"0","FR":"0.0","VSD":"0","ACB":"-","ASB":"-","VPR":"-","VSST":"-"},"MT":"-","TCPSL":"30/30/30 21 1/1 0.0/0.0","CT":"application/octet-stream","MS":"44XXXXXXXXXX","MSH":"rPD7X2tPXvDdbRPEeEjIlvpggJcu/UPStkSEgxepvB8=","SID":"v4.172.16.137.150:9EE6E2767E92B7A2","SuID":"-","UA":"Apache-HttpClient/UNAVAILABLE (Java/0)","DID":"Jakarta Commons-HttpClient#2#App#Library-Utility","UAG":"Laptop-Web","CID":"TECH","HR":"-","CRG":"3001010006000000","CP1":"8232F40100A032F40107C8667D","CP2":"3528020922700601","AIDF":"6:1191","UCB":"0/0","CLID":"-","CLCL":"0","OPTS":"-","PUAG":"Laptop-Web","SSLIL":"-","HRU":{"HM":"POST","HD":"ui.ff.avast.com","HP":"/v5/urlinfo/_MD/0073AE105208F6A80000017019A3861E30653939333461620000016F7DF63D19/1580799898571","HQ":""},"URLF":{"CID":"-","CGID":"-","CR":"-","RA":"-","USM":"-","USP":"-","MUS":"-"},"TCPSt":{"WS":"0","SE":"0","WSFNS":"0","WSF":"0","EM":"0","RSTE":"0","MSS":"0"},"NS":{"OPID":"-","ODID":"-","EPID":"-","TrID":"-","VSN":"-","LSUT":"-","STTS":"-","TCPPR":"-"},"CQA":{"NL":"-","CL":"-","CLC":"-","SQ":"-","SQC":"-"}}}

Groovy код

import groovy.json.JsonSlurper
import groovy.json.JsonParserType
import java.util.function.*
import java.lang.*

class customJSONtoCSV implements Processor {

def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
def REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that were not successfully processed").build();

    def log
    static def flatten(row, prefix="") {
            def flattened = new HashMap<String, String>()
                            row.each { String k, Object v ->
            def key = prefix ? prefix + "_" + k : k;

            if (v instanceof Map) {
                flattened.putAll(flatten(v, k))
            } else {
                flattened.put(key, '"' + v.toString() + '"')
            }
        }

        return flattened
    }

        static def toCSVRow(HashMap row) {
        def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","TG_DL","TG_I:R","TG_UAP","TG_EDBL","TG_Ca","TG_A","TG_RQM","TG_RSM","TG_FIT","TG_CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","AS","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]

            return columns.collect { column ->
            return row.containsKey(column) ? row.get(column) : ""
        }.join('|')
    }

    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }

    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS, REF_FAILURE] as Set
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))

        def jsonSlurper = new JsonSlurper()
        def line
        def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,TG_DL,TG_IR,TG_UAP,TG_EDBL,TG_Ca,TG_A,TG_RQM,TG_RSM,TG_FIT,TG_CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,AS,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"

        outputStream.write("${header}\n".getBytes('UTF-8'))

        while (line = bufferedReader.readLine()) {

        def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)

        def a = jsonRecord.transaction.collect { row ->
                return flatten(row)
                }.collect { row ->
                return toCSVRow(row).toString()
                }
        String b = a.toString().replace('[','').replace(']','')
        outputStream.write("${b}\n".getBytes('UTF-8'))

        }

 } as StreamCallback)

            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
    throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

   List<PropertyDescriptor> getPropertyDescriptors() { 

return [] as List
}

    @Override

    String getIdentifier() { return null }
}
processor = new customJSONtoCSV()

1 Ответ

0 голосов
/ 12 февраля 2020

здесь приведен пример того, как записать два файла в разные отношения для ExecuteGroovyScript процессора

. Предположим, в качестве входного файла имеется текстовый файл с числами в каждой строке

11
22
33
44

следующее groovy код разбивает входящий файл на два REL_SUCCESS для нечетных чисел и REL_FAILURE для четных чисел

ff=session.get()
if(!ff)return

def ff2 = ff.clone(false)

ff.write{streamIn,streamOut->
    streamOut.withWriter("UTF-8"){w->
        streamIn.eachLine("UTF-8"){line->
            if(line.toInteger()%2){
                w.append(line + '\n')
            }else{
                ff2.append("UTF-8", line + '\n')
            }
        }
    }
}

REL_SUCCESS<<ff

//transfer ff2 only if not empty
if(ff2.getSize()>0){
    REL_FAILURE<<ff2 
}else{
    ff2.remove()
}

вы можете переписать этот код для InvokedScriptedProcessor, но он будет только больше (IHMO) ...

...