Игнорировать только неверную запись CSV, используя groovy: Apache -NiFi - PullRequest
0 голосов
/ 12 февраля 2020

Я пытаюсь проверить процесс csv, используя Apache -NiFi. Я использую ExecuteGroovyScript для обработки csv и получения данных.

У моего исходного csv есть проблема, некоторые записи выглядят так:

id,name,age,bd,email,address
1,sachith,29,9,sachith@email.com,{"No": "1","Lane":"Lane-1"}
2,nalaka,29,17,nalaka@email.com,{"No": "1","Lane":
"Lane-1"}

здесь 2-я запись недействительна, я хочу удалить только эта запись и остальные процессы.

import groovy.json.*

def ff=session.get()
if(!ff)return

def parser = new JsonSlurper().setType(JsonParserType.LAX)

ff.write{streamIn,streamOut->
    streamIn.withReader('UTF-8'){r->      //convert in stream to reader
        streamOut.withWriter('UTF-8'){w-> //convert out stream to writer
            //go line by line
            r.eachLine{line, lineNum->
                if(lineNum==1){
                    w<<line<<'id,name,age,bd,email,address'<<'\n'        //for the first line just add some headers
                }else{
                    def row=line.split(',')          //split line by coma
                    def json=row[5..-1].join(',')    //join back to string starting from 3rd element
                    json = parser.parseText(json)
                    w<<"${json.id},${json.name},${json.age},${json.bd},${json.email},${json.address}"<<'\n'
                }
            }
        }
    }
}
REL_SUCCESS<<ff

Это было взято из моего предыдущего вопроса .

В основном я хочу просто проигнорировать запись и процесс с другими значениями:

Я ссылался: groovy. json .JsonException: ожидание

Groovy: проверка JSON строка

Но я не понимаю, как интегрировать это в Apache -NiFi поток.

1 Ответ

1 голос
/ 12 февраля 2020

Я согласен, что лучше исправить источник

, однако, если это невозможно, вы можете попытаться найти совпадение, если строка завершена

import groovy.json.*

def parser = new JsonSlurper().setType(JsonParserType.LAX) //LAX to accept strings without double-quotes

def w = System.out
def buf = new StringBuilder() //buffer to collect lines if they are not complete
new StringReader('''id,name,age,bd,email,address
1,sachith,29,9,sachith@email.com,{"No": "1","Lane":"Lane-1"}
2,nalaka,29,17,nalaka@email.com,{"No": "1"
,"Lane":"Lane-1"}''').withReader{r->
    r.eachLine{line, lineNum->
        if(lineNum==1){
            w<<line<<'id,name,age,bd,email,address'<<'\n'
        }else{
            buf<<(buf?'\n':'')<<line //append line to previous incomplete line(s)
            if(buf=~/(?s)^\d.*\}$/){
                //normal line: starts with number and ends with }
                def row=buf.toString().split(',')   //split line by coma
                def json=row[5..-1].join(',')       //join back to string starting from 3rd element
                json = parser.parseText(json)
                w<<"${json.No},${json.Lane}"<<'\n'
                buf.setLength(0) //reset buffer
            }
        }
    }
}
...