Мне придется обрабатывать 10 миллиардов вложенных JSON записей в день с использованием NiFi (версия 1.9). В рамках работы я пытаюсь преобразовать вложенный JSON в csv, используя скрипт Groovy. Я сослался на приведенные ниже вопросы о переполнении стека, относящиеся к той же самой топи c, и придумал приведенный ниже код.
Groovy собрать из карты и подкарты
как полностью преобразовать json в пару значений ключей, используя groovy
Но я не уверен, как получить значение дубликатов ключей. Образец json определен в переменной "json" в приведенном ниже коде. Ключ «Flag1» будет входить в несколько разделов (т. е. «OF» и «SF»). Я хочу получить вывод в виде csv.
Ниже приведен вывод, если я выполню следующий groovy код 2019-10-08 22: 33: 29.244000, v12, -, 36178,0, 0 / 0,10.65.5.56, sf, sf (значение ключа flag1 заменяется последним значением вхождения этого ключевого столбца)
Я не эксперт в Groovy. Также, пожалуйста, предложите, если есть какой-то другой лучший подход, так что я попробую.
import groovy.json.*
def json = '{"transaction":{"TS":"2019-10-08 22:33:29.244000","CIPG":{"CIP":"10.65.5.56","CP":"0"},"OF":{"Flag1":"of","Flag2":"-"},"SF":{"Flag1":"sf","Flag2":"-"}}'
def jsonReplace = json.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
def columns = ["TS","V","PID","RS","SR","CnID","CIP","Flag1","Flag1"]
def flatten
flatten = { row ->
def flattened = [:]
row.each { k, v ->
if (v instanceof Map) {
flattened << flatten(v)
} else if (v instanceof Collection && v.every {it instanceof Map}) {
v.each { flattened << flatten(it) }
} else {
flattened[k] = v
}
}
flattened
}
print "output: " + jsonRecord.transaction.collect {row -> columns.collect {colName -> flatten(row)[colName]}.join(',')}.join('\n')
Редактировать: Основываясь на ответе @cfrick и @stck, я попробовал эту опцию и продолжил вопрос ниже.
@ cfrick и @ stck - Спасибо за ваш ответ.
- Исходный источник JSON запись будет иметь более 100 столбцов, и я использую "InvokeScriptedProcessor" в NiFi запустить скрипт Groovy.
- Ниже приведен оригинальный скрипт Groovy, который я использую в "InvokeScriptedProcessor", в котором я использовал Streams (inputtream, outputtream). Это то, что вы имеете в виду. Я что-то не так делаю?
import groovy.json.JsonSlurper
class customJSONtoCSV implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
def log
static def flatten(row, prefix="") {
def flattened = new HashMap<String, String>()
row.each { String k, Object v ->
def key = prefix ? prefix + "_" + k : k;
if (v instanceof Map) {
flattened.putAll(flatten(v, k))
} else {
flattened.put(key, v.toString())
}
}
return flattened
}
static def toCSVRow(HashMap row) {
def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","DL","I_R","UAP","EDBL","Ca","A","RQM","RSM","FIT","CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","A_S","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]
return columns.collect { column ->
return row.containsKey(column) ? row.get(column) : ""
}.join(',')
}
@Override
void initialize(ProcessorInitializationContext context) {
log = context.getLogger()
}
@Override
Set<Relationship> getRelationships() {
return [REL_SUCCESS] as Set
}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.get()
if (!flowFile) return
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
def jsonSlurper = new JsonSlurper()
def line
def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,DL,I_R,UAP,EDBL,Ca,A,RQM,RSM,FIT,CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,A_S,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"
outputStream.write("${header}\n".getBytes('UTF-8'))
while (line = bufferedReader.readLine()) {
def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
def a = jsonRecord.transaction.collect { row ->
return flatten(row)
}.collect { row ->
return toCSVRow(row)
}
outputStream.write("${a}\n".getBytes('UTF-8'))
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
}
catch (e) {
throw new ProcessException(e)
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) { return null }
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
return [] as List
}
@Override
String getIdentifier() { return null }
}
processor = new customJSONtoCSV()
Если я не должен использовать «собирать», то что еще мне нужно использовать для создания строк. В файле выходного потока вывод записи идет внутрь []. Я попробовал ниже, но это не работает. Не уверен, правильно ли я поступаю. Я хочу вывод в формате csv без []
return toCSVRow(row).toString()