Каков оптимальный способ синтаксического анализа следующего сообщения Кафки JSON в фрейм данных pyspark? - PullRequest
0 голосов
/ 27 августа 2018

Я использую структурированную потоковую передачу в формате spark для чтения темы kafka и хочу преобразовать следующий сложный JSON (kafka-msgs) в кадр данных, содержащий столбцы «NAME, ADDRESS, DESCRIPTION, CODE, DEPARTMENT, INFA_OP_TYPE, DTL__CAPXTIMESTAMP».

{ 
  "meta_data": [{"name":{"string":"INFA_SEQUENCE"},"value": 
{"string":"2,PWX_GENERIC"},"type":null},
          {"name":{"string":"INFA_TABLE_NAME"},"value":{"string":"customers"},"type":null},
          {"name":{"string":"INFA_OP_TYPE"},"value":{"string":"INSERT_EVENT"},"type":null},
          {"name":{"string":"DTL__CAPXRESTART1"},"value":{"string":"B+IABwAfA"},"type":null},
          {"name":{"string":"DTL__CAPXRESTART2"},"value":{"string":"AAABpMwgRDk="},"type":null},
          {"name":{"string":"DTL__CAPXUOW"},"value":{"string":"AAMKPgAAqaIABg=="},"type":null},
          {"name":{"string":"DTL__CAPXUSER"},"value":null,"type":null},
          {"name":{"string":"DTL__CAPXTIMESTAMP"},"value":{"string":"201807310934257270000000"},"type":null},
          {"name":{"string":"DTL__CAPXACTION"},"value":{"string":"I"},"type":null}],
"columns":{"array":[{"name":{"string":"NAME"},"value":{"string":"ABCD"},"isPresent":{"boolean":true}},
                  {"name":{"string":"ADDRESS"},"value":{"string":"123,Bark street"},"isPresent":{"boolean":true}},
                  {"name":{"string":"DESCRIPTION"},"value":{"string":"Canadian"},"isPresent":{"boolean":true}},
                  {"name":{"string":"CODE"},"value":{"string":"3_1"},"isPresent":{"boolean":true}},
                  {"name":{"string":"DEPARTMENT"},"value":{"string":"HR"},"isPresent":{"boolean":true}}
                 ]     }
}

Я могу извлечь два объекта json "meta_data" и "columns", но не могу взорвать "columns.array"

newJsonObj = events.select(get_json_object(events.value,'$.meta_data').alias('meta_data'),get_json_object(events.value,'$.columns.array').alias('columns'))

И я не знаю, как извлечь значения из двух объектов json и создать фрейм данных, имеющий столбцы из обоих объектов json.

- Схема кадра данных событий -

root
|-- columns: struct (nullable = true)
|    |-- array: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- isPresent: struct (nullable = true)
|    |    |    |    |-- boolean: boolean (nullable = true)
|    |    |    |-- name: struct (nullable = true)
|    |    |    |    |-- string: string (nullable = true)
|    |    |    |-- value: struct (nullable = true)
|    |    |    |    |-- string: string (nullable = true)
|-- meta_data: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- name: struct (nullable = true)
|    |    |    |-- string: string (nullable = true)
|    |    |-- type: string (nullable = true)
|    |    |-- value: struct (nullable = true)
|    |    |    |-- string: string (nullable = true)
...