Прочитать файл словарей как фрейм данных pyspark - PullRequest
0 голосов
/ 28 февраля 2020

Проблема, с которой я сталкиваюсь, состоит в том, что у меня есть файл (или несколько файлов), заполненный словарями, и я пытаюсь затем попасть в информационный кадр. Входной файл может выглядеть следующим образом:

{"A":"value1", "B":"value2"}
{"A":"value2", "B":"value3"}
{"A":"value4", "B":"value5", "C":"value6"}

Проблемы, с которыми я сталкиваюсь:

  • Словари не разделены новой строкой, запятой или чем-то еще. Это однострочный файл, и, к сожалению, я ничего не могу с этим поделать.
  • Словари могут иметь разное количество ключей. Но у меня есть схема конечного кадра данных.

В приведенном выше примере желаемый результат будет:

A          B          C
value1     value2     null
value2     value3     null
value4     value5     value6

То, что я пробовал до сих пор:

spark_sql_context.read.json(path_to_file)

Это читает только первый словарь и возвращает фрейм данных pyspark с одной строкой. Я также попытался прочитать его как текстовый файл:

data_rdd = spark_context.textFile(path_to_file)

Проблема в том, что я не знаю:

  1. как разбить строку, так как нет словаря между словарями и Словари
  2. имеют разную длину.

Буду признателен, если вы укажете мне метод или решение этой проблемы.

1 Ответ

1 голос
/ 29 февраля 2020

Вы можете прочитать его как текст, а затем разделить на }{, чтобы получить массив JSON объектов. Для этого сначала мы заменим }{ на };{, а затем разделим на ;.

df = spark.read.text(path)
df = df.withColumn("values", explode(split(regexp_replace(col("value"), "\\}\\{", "\\};\\{"), ";")))

df.show()

#+------------------------------------------+
#|value                                     |
#+------------------------------------------+
#|{"A":"value1", "B":"value2"}              |
#|{"A":"value2", "B":"value3"}              |
#|{"A":"value4", "B":"value5", "C":"value6"}|
#+------------------------------------------+

Теперь используйте from_json с вашим schema для анализа json для структурирования:

schema = StructType([StructField("A", StringType(), True),
                     StructField("B", StringType(), True),
                     StructField("C", StringType(), True)
                    ])

df = df.withColumn("value", from_json(col("value"), schema)).select("value.*")

df.show()

#+------+------+------+
#|A     |B     |C     |
#+------+------+------+
#|value1|value2|null  |
#|value2|value3|null  |
#|value4|value5|value6|
#+------+------+------+
...