Конвертировать массив Json в CSV, используя Apache Nifi - PullRequest
0 голосов
/ 23 марта 2019

Я хочу преобразовать JSON с массивом в формат CSV. Количество элементов в массиве является динамическим для каждой строки. Я попытался использовать этот поток (прикрепил файл потока XML на пост).

GetFile -> ConvertRecord -> UpdateAttribute -> PutFile

Есть ли другие альтернативы?

JSON формат:

{  "LogData": {
"Location": "APAC",
"product": "w1"  },  "Outcome": [
{
  "limit": "0",
  "pri": "3",
  "result": "pass"
},
{
  "limit": "1",
  "pri": "2",
  "result": "pass"
},
{
  "limit": "5",
  "priority": "1",
  "result": "fail"
}  ],  "attr": {
"vers": "1",
"datetime": "2018-01-10 00:36:00"  }}

Ожидаемый результат в CSV:

location,   product,    limit,  pri,    result, vers,   datetime
APAC        w1          0       3       pass    1       2018-01-10 00:36:00
APAC        w1          1       2       pass    1       2018-01-10 00:36:00
APAC        w1          5       1       fail    1       2018-01-10 00:36:00

Выход из присоединенного потока: LogData, Исход, атр "MapRecord [{product = w1, Location = APAC}]", "[MapRecord [{limit = 0, result = pass, pri = 3}], MapRecord [{limit = 1, result = pass, pri = 2}] , MapRecord [{limit = 5, result = fail}]] "," MapRecord [{datetime = 2018-01-10 00:36:00, vers = 1}] "

enter image description here

ConvertRecord - я использую конфигурации JSONTreereader и CSVRecordSSetwriter, как показано ниже: enter image description here

Конфигурация службы JSONTreereader Controler: enter image description here Конфигурация службы контроллера CSVRecordSetwriter: enter image description here Конфигурация службы контроллера AvroschemaRegistry: enter image description here

Авро схема: {"name": "myschema", "type": "record", "namespace": "myschema", "fields": [{"name": "LogData", "type": {"name": "LogData "," type ":" record "," fields ": [{" name ":" Location "," type ":" string "}, {" name ":" product "," type ":" string "} ]}}, {"name": "Outcome", "type": {"type": "array", "items": {"name": "Outcome_record", "type": "record", "fields" : [{"name": "limit", "type": "string"}, {"name": "pri", "type": ["string", "null"]}, {"name": " результат "," тип ":" строка "}]}}}, {" имя ":" атрибут "," тип ": {" имя ":" атрибут "," тип ":" запись "," поля ": [{"name": "vers", "type": "string"}, {"name": "datetime", "type": "string"}]}}]}

Ответы [ 2 ]

1 голос
/ 26 марта 2019

Попробуйте эту спецификацию в JoltTransformJSON перед ConvertRecord:

  {
    "operation": "shift",
    "spec": {
      "Outcome": {
        "*": {
          "@(3,LogData.Location)": "[#2].location",
          "@(3,LogData.product)": "[#2].product",
          "@(3,attr.vers)": "[#2].vers",
          "@(3,attr.datetime)": "[#2].datetime",
          "*": "[#2].&"
        }
      }
    }
  }
]```
1 голос
/ 24 марта 2019

Кажется, что вам нужно выполнить JoltTransform перед преобразованием в CSV, если не собирается работать.

...