Работа с неоднородными столбцами JSON в искровом фрейме - PullRequest
1 голос
/ 30 сентября 2019

Я хотел бы знать, как лучше всего читать файл JSON с разделителями новой строки в кадре данных. Важно то, что одно из (обязательных) полей в каждой записи отображается на объект, который не обязательно имеет одинаковые подполя (т. Е. Схема неоднородна по всем записям).

Например,входной файл может выглядеть следующим образом:

{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}
{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}
{"id": 3, "type": "baz", "data": {"key3": "moo"}}

В этом случае поля id, type и data будут присутствовать во всех записях, но структура, сопоставленная с data, будетимеют гетерогенную схему.

У меня есть два подхода к решению проблемы неравномерности столбца data:

  1. позвольте свече вывести схему:
df = spark.read.options(samplingRatio=1.0).json('s3://bucket/path/to/newline_separated_json.txt')

Очевидным недостатком этого подхода является необходимость выборки каждой записи для определения супер-набора полей / схем, которые будут окончательной схемой. Это может быть слишком дорого, учитывая набор данных в сотнях миллионов записей? Или ...

Скажите spark, чтобы преобразовать поле данных в строку JSON, а затем просто иметь схему, состоящую из трех строковых полей верхнего уровня, id, type, data. И здесь я не совсем уверен, что лучший способ продолжить. Например, я предполагаю, что простое объявление поля data строкой, как показано ниже, не будет работать, потому что оно явно не делает эквивалент json.dumps?
schema = StructType([
    StructField("id", StringType(), true),
    StructField("type", StringType(), true),
    StructField("data", StringType(), true)
])
df = spark.read.json('s3://bucket/path/to/newline_separated_json.txt', schema=schema)

ЕслиЯ хочу избежать затрат на сканирование полного набора данных, вызванного вариантом 1, как лучше всего проглотить этот файл и сохранить поле data в виде строки JSON?

Спасибо

1 Ответ

1 голос
/ 01 октября 2019

Я думаю, что ваша попытка и общая идея в правильном направлении. Вот еще два подхода, основанные на встроенных параметрах, называемых get_json_object / from_json через API-интерфейс DataFrame, и использовании преобразования map вместе с json.dumps() и json.loads() в Python через RDD API.

Вариант 1: get_json_object () / from_json ()

Сначала давайте попробуем get_json_object(), который не требует схемы:

import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

df.select(f.get_json_object("value", "$.id").alias("id"), \
          f.get_json_object("value", "$.type").alias("type"), \
           f.get_json_object("value", "$.data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

Напротив from_json() требуется определение схемы:

from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

df.select(f.from_json("value", schema).getItem("id").alias("id"), \
         f.from_json("value", schema).getItem("type").alias("type"), \
         f.from_json("value", schema).getItem("data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

Опция 2: API map / RDD + json.dumps ()

from pyspark.sql.types import StringType, StructType, StructField
import json

df = spark.createDataFrame([
  '{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}',
  '{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}',
  '{"id": 3, "type": "baz", "data": {"key3": "moo"}}'
], StringType())

def from_json(data):
  row = json.loads(data[0])
  return (row['id'], row['type'], json.dumps(row['data']))

json_rdd = df.rdd.map(from_json)

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

spark.createDataFrame(json_rdd, schema).show(10, False)

# +---+----+--------------------------------+
# |id |type|data                            |
# +---+----+--------------------------------+
# |1  |foo |{"key2": "meh", "key0": "foo"}  |
# |2  |bar |{"key2": "poo", "key3": "pants"}|
# |3  |baz |{"key3": "moo"}                 |
# +---+----+--------------------------------+

Функция from_json преобразует строку строки в кортеж (id, type, data). json.loads () проанализирует строку json и вернет словарь, через который мы создадим и вернем финальный кортеж.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...