Нифи, данные Json - PullRequest
       12

Нифи, данные Json

0 голосов
/ 24 сентября 2018

У меня есть входные данные json как

{
"type" : "insert",
"timestamp" : 1536959101000,
"binlog_filename" : "mysql-bin-changelog.234234",
"binlog_position" : 223,
"database" : "test",
"table_name" : "demo",
"table_id" : 138,
"columns" : [ {
"id" : 1,
"name" : "id",
"column_type" : 12,
"value" : "IboECKV "
}, {
"id" : 2,
"name" : "col2",
"column_type" : 93,
"value" : "Fri Sep 14 21:05:02 UTC 2018"
}, {
"id" : 3,
"name" : "col3",
"column_type" : 4,
"value" : 10
}, {
"id" : 4,
"name" : "col4",
"column_type" : 4,
"value" : 0
}]
}

Если тип_толбца = 93 (дата / время) :преобразовать значение в: yyyy-MM-dd HH:mm:ss.SSSZ

Таким образом, целевое значение составляет

[ {
 "id" : "IboECKV "
}, {
 "col2" : "2018-09-14 21:05:02.000Z"
}, {
"col3" : 10
}, {
"col4" : 0
}]

Знаете ли вы, как решить этот случай?

Большое спасибо,

1 Ответ

0 голосов
/ 25 сентября 2018

Вы можете использовать ExecuteScript и использовать Groovy, чтобы выполнить синтаксический анализ ввода Json и даты и отформатировать ее в соответствии с желаемым форматом, используя SimpleDateFormat.

Быстрый пример:

import java.text.SimpleDateFormat 
import java.util.Date
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

flowFile = session.get()
if(!flowFile)return
def text = ''

session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)

def jsonSlurper = new JsonSlurper()
def object = jsonSlurper.parseText(text)

def columnsSize = object.columns.size

0.upto(columnsSize - 1) {
    if (object.columns[it].column_type == 93 ) {
        oldDate = object.columns[it].value
        sdfmt2= new SimpleDateFormat('dd-M-yyyy')
        parsedDate = sdfmt2.parse(oldDate)
        object.columns[it].value = parsedDate
        output = JsonOutput.toJson(object)

        flowFile = session.write(flowFile, {outputStream ->
            outputStream.write(output.getBytes(StandardCharsets.UTF_8))
        } as OutputStreamCallback)
        session.transfer(flowFile, REL_SUCCESS)
    }
}
...