Вложенный JSON с дублирующимися ключами - PullRequest
1 голос
/ 17 января 2020

Мне придется обрабатывать 10 миллиардов вложенных JSON записей в день с использованием NiFi (версия 1.9). В рамках работы я пытаюсь преобразовать вложенный JSON в csv, используя скрипт Groovy. Я сослался на приведенные ниже вопросы о переполнении стека, относящиеся к той же самой топи c, и придумал приведенный ниже код.

Groovy собрать из карты и подкарты

как полностью преобразовать json в пару значений ключей, используя groovy

Но я не уверен, как получить значение дубликатов ключей. Образец json определен в переменной "json" в приведенном ниже коде. Ключ «Flag1» будет входить в несколько разделов (т. е. «OF» и «SF»). Я хочу получить вывод в виде csv.

Ниже приведен вывод, если я выполню следующий groovy код 2019-10-08 22: 33: 29.244000, v12, -, 36178,0, 0 / 0,10.65.5.56, sf, sf (значение ключа flag1 заменяется последним значением вхождения этого ключевого столбца)

Я не эксперт в Groovy. Также, пожалуйста, предложите, если есть какой-то другой лучший подход, так что я попробую.

    import groovy.json.*

    def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'

    def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)
        def columns = ["TS","V","PID","RS","SR","CnID","CIP","Flag1","Flag1"]

        def flatten
        flatten = { row ->
            def flattened = [:]
            row.each { k, v ->
                if (v instanceof Map) {
                    flattened << flatten(v)
                } else if (v instanceof Collection && v.every {it instanceof Map}) {
                    v.each { flattened << flatten(it) }
                } else {
                    flattened[k] = v
                }
            }
            flattened
        }
        print "output: " + jsonRecord.transaction.collect {row -> columns.collect {colName -> flatten(row)[colName]}.join(',')}.join('\n')

Редактировать: Основываясь на ответе @cfrick и @stck, я попробовал эту опцию и продолжил вопрос ниже.

@ cfrick и @ stck - Спасибо за ваш ответ.

  1. Исходный источник JSON запись будет иметь более 100 столбцов, и я использую "InvokeScriptedProcessor" в NiFi запустить скрипт Groovy.
  2. Ниже приведен оригинальный скрипт Groovy, который я использую в "InvokeScriptedProcessor", в котором я использовал Streams (inputtream, outputtream). Это то, что вы имеете в виду. Я что-то не так делаю?
import groovy.json.JsonSlurper
class customJSONtoCSV implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();

    def log
    static def flatten(row, prefix="") {
            def flattened = new HashMap<String, String>()
                            row.each { String k, Object v ->
            def key = prefix ? prefix + "_" + k : k;

            if (v instanceof Map) {
                flattened.putAll(flatten(v, k))
            } else {
                flattened.put(key, v.toString())
            }
        }
        return flattened
    }
        static def toCSVRow(HashMap row) {
        def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","DL","I_R","UAP","EDBL","Ca","A","RQM","RSM","FIT","CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","A_S","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]

            return columns.collect { column ->
            return row.containsKey(column) ? row.get(column) : ""
        }.join(',')
    }
    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }
    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS] as Set
    }
    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))

        def jsonSlurper = new JsonSlurper()
        def line
        def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,DL,I_R,UAP,EDBL,Ca,A,RQM,RSM,FIT,CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,A_S,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"

        outputStream.write("${header}\n".getBytes('UTF-8'))
        while (line = bufferedReader.readLine()) {

        def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)
        def a = jsonRecord.transaction.collect { row ->
                return flatten(row)
                }.collect { row ->
                return toCSVRow(row)
                }

        outputStream.write("${a}\n".getBytes('UTF-8'))

        }
 } as StreamCallback)

            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
            throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

   List<PropertyDescriptor> getPropertyDescriptors() { 

return [] as List
}
    @Override

    String getIdentifier() { return null }
}
processor = new customJSONtoCSV() 
Если я не должен использовать «собирать», то что еще мне нужно использовать для создания строк. В файле выходного потока вывод записи идет внутрь []. Я попробовал ниже, но это не работает. Не уверен, правильно ли я поступаю. Я хочу вывод в формате csv без []
return toCSVRow(row).toString()

Ответы [ 2 ]

1 голос
/ 17 января 2020

Если вы точно знаете, что вы хотите извлечь (и если вы хотите сгенерировать CSV из него), ИМХО, вам лучше просто формировать данные так, как вы впоследствии захотите их использовать. Например,

def data = new groovy.json.JsonSlurper().parseText('[{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}]')

extractors = [
    { it.TS },
    { it.V },
    { it.PID },
    { it.RS },
    { it.SR },
    { it.CIPG.CIP },
    { it.CIPG.CP },
    { it.OF.Flag1 },
    { it.SF.Flag1 },]

def extract(row) {
    extractors.collect{ it(row) }
}

println(data.collect{extract it})

// ⇒ [[2019-10-08 22:33:29.244000, null, null, null, null, 10.65.5.56, 0, of, sf]]

Как указано в другом ответе, из-за большого количества данных, которые вы пытаетесь преобразовать: *

  • . Убедитесь, что вы используете библиотеку для генерации CSV. файл из этого, иначе вы столкнетесь с проблемами с содержимым, которое вы пытаетесь записать (например, разрывы строк или данные, содержащие символ-разделитель).
  • Не используйте collect (это нетерпеливо), чтобы создать строки.
0 голосов
/ 17 января 2020

Идея состоит в том, чтобы изменить метод "flatten" - он должен различать одни и те же вложенные ключи путем предоставления родительского ключа в качестве префикса. Я немного упростил код:

import groovy.json.*

def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'
def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}','}}]')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)

static def flatten(row, prefix="") {
    def flattened = new HashMap<String, String>()
    row.each { String k, Object v ->
        def key = prefix ? prefix + "." + k : k;

        if (v instanceof Map) {
            flattened.putAll(flatten(v, k))
        } else {
            flattened.put(key, v.toString())
        }
    }

    return flattened
}

static def toCSVRow(HashMap row) {
    def columns = ["TS","V","PID","RS","SR","CnID","CIP","OF.Flag1","SF.Flag1"] // Last 2 keys have changed!

    return columns.collect { column ->
        return row.containsKey(column) ? row.get(column) : ""
    }.join(', ')
}

def a = jsonRecord.transaction.collect { row ->
    return flatten(row)
}.collect { row ->
    return toCSVRow(row)
}.join('\n')

println a

Вывод будет:

2019-10-08 22:33:29.244000, , , , , , , of, sf
...