Применение некоторых регулярных выражений и преобразование в rdd
может помочь вам в этом.
Сначала прочитайте файл, используя textFile
:
a=spark.read.option('multiline',"true").text('aa.json')
a.show(truncate=False)
#+-------------------------------------+
#|value |
#+-------------------------------------+
#|[[{"foo":"test1"},{"foo1":"test21"}],|
#|[{"foo":"test2"},{"foo1":"test22"}], |
#|[{"foo":"test3"},{"foo1":"test23"}]] |
#+-------------------------------------+
Теперь мы можем использовать pyspark.sql.functions.regexp_replace
убрать лишние квадратные скобки и запятую из каждой строки:
from pyspark.sql.functions import regexp_replace
a = a.select(regexp_replace("value", "(^\[(?=\[))|((?<=\])\]$)|(,$)", "").alias("value"))
a.show(truncate=False)
#+-----------------------------------+
#|value |
#+-----------------------------------+
#|[{"foo":"test1"},{"foo1":"test21"}]|
#|[{"foo":"test2"},{"foo1":"test22"}]|
#|[{"foo":"test3"},{"foo1":"test23"}]|
#+-----------------------------------+
Шаблон здесь логический или из следующих шаблонов:
^\[(?=\[)
: начало строкизатем [[
(вторая [
- группа без захвата) (?<=\])\]$
: ]]
в конце строки (первая ]
- группа без захвата) ,$
: запятая в конце строки
Все подходящие шаблоны заменяются пустой строкой.
Теперь преобразуйте в rdd
и используйте json.loads
для разбора ваших строк в списки словарей.Затем объедините все эти словари в один словарь и вызовите конструктор pyspark.sql.Row
.Наконец, вызовите .toDF
для преобразования обратно в DataFrame.
# From `How to merge two dictionaries in a single expression?`
# This code works for python 2 and 3
def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
import json
from pyspark.sql import Row
from functools import reduce
a.rdd.map(lambda x: Row(**reduce(merge_two_dicts, json.loads(x['value'])))).toDF().show()
#+-----+------+
#| foo| foo1|
#+-----+------+
#|test1|test21|
#|test2|test22|
#|test3|test23|
#+-----+------+
Ссылки :