Как читать многоуровневый JSON в Pyspark? - PullRequest
0 голосов
/ 18 декабря 2018
**Json Structure is -:**
aa.json

[[{"foo":"test1"},{"foo1":"test21"}],
[{"foo":"test2"},{"foo1":"test22"}],
[{"foo":"test3"},{"foo1":"test23"}]]

Код для чтения DataFrame:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

a=sqlContext.read.option('multiline',"true").json('aa.json');
a.show()
+----+----+
| foo|foo1|
+----+----+
|null|null|
+----+----+

a.printSchema()
root
 |-- foo: string (nullable = true)
 |-- foo1: string (nullable = true)

Вот строки для чтения этого json, он может анализировать схему, но не данные.

1 Ответ

0 голосов
/ 19 декабря 2018

Применение некоторых регулярных выражений и преобразование в rdd может помочь вам в этом.

Сначала прочитайте файл, используя textFile:

a=spark.read.option('multiline',"true").text('aa.json')
a.show(truncate=False)
#+-------------------------------------+
#|value                                |
#+-------------------------------------+
#|[[{"foo":"test1"},{"foo1":"test21"}],|
#|[{"foo":"test2"},{"foo1":"test22"}], |
#|[{"foo":"test3"},{"foo1":"test23"}]] |
#+-------------------------------------+

Теперь мы можем использовать pyspark.sql.functions.regexp_replaceубрать лишние квадратные скобки и запятую из каждой строки:

from pyspark.sql.functions import regexp_replace
a = a.select(regexp_replace("value", "(^\[(?=\[))|((?<=\])\]$)|(,$)", "").alias("value"))
a.show(truncate=False)
#+-----------------------------------+
#|value                              |
#+-----------------------------------+
#|[{"foo":"test1"},{"foo1":"test21"}]|
#|[{"foo":"test2"},{"foo1":"test22"}]|
#|[{"foo":"test3"},{"foo1":"test23"}]|
#+-----------------------------------+

Шаблон здесь логический или из следующих шаблонов:

  • ^\[(?=\[): начало строкизатем [[ (вторая [ - группа без захвата)
  • (?<=\])\]$: ]] в конце строки (первая ] - группа без захвата)
  • ,$: запятая в конце строки

Все подходящие шаблоны заменяются пустой строкой.

Теперь преобразуйте в rddи используйте json.loads для разбора ваших строк в списки словарей.Затем объедините все эти словари в один словарь и вызовите конструктор pyspark.sql.Row.Наконец, вызовите .toDF для преобразования обратно в DataFrame.

# From `How to merge two dictionaries in a single expression?`
# This code works for python 2 and 3
def merge_two_dicts(x, y):
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

import json
from pyspark.sql import Row
from functools import reduce 

a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()
#+-----+------+
#|  foo|  foo1|
#+-----+------+
#|test1|test21|
#|test2|test22|
#|test3|test23|
#+-----+------+

Ссылки :

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...