Конвертировать столбец строк в словари в pyspark sql dataframe - PullRequest
1 голос
/ 02 марта 2020

Мне нужно работать с форматом файла, где каждая строка является объектом json. Например:

{'Attribute 1': 'A', 'Attribute 2': 1.5, 'Attribute 3': ['A','B','C'], 'Attribute 4': {'A': 5}}
{'Attribute 1': 'B', 'Attribute 2': 2.0, 'Attribute 3': ['A'], 'Attribute 4': {'A': 4}}
{'Attribute 1': 'C', 'Attribute 2': 1.7, 'Attribute 3': ['A','C'], 'Attribute 4': {'A': 3}}

Обратите внимание, что это недопустимый формат файла json, поскольку он не заключен в массив. Кроме того, фактические структуры гораздо больше и больше вложенных. Эти файлы распространяются в s3. Раньше я использовал только паркет или CSV, поэтому я не уверен, как читать эти файлы.

В настоящее время я пишу процесс для объединения этих данных с несколькими другими таблицами, и, поскольку данные велики и находится в s3, я использую pyspark. sql в кластере emr для выполнения операций. Я могу создать таблицу с одним столбцом, содержащим объекты в виде строк, используя:

from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
sqlContext = SQLContext(sc)

schema = StructType([
    StructField('json_format', StringType())
])

context = sqlContext.read
context = context.schema(schema)

df = context.load(
    folder_path,
    format='com.databricks.spark.csv',
    delimiter=','
)
df.createOrReplaceTempView('my_table')

Как я могу преобразовать этот столбец в словарь, где я могу получить доступ к различным атрибутам? Есть ли эквивалент лямбда-функции?

1 Ответ

1 голос
/ 02 марта 2020

Чтобы сделать действительным json объект, мы можем заменить все ' на ", затем с помощью функции get_json_object() мы можем получить доступ к атрибутам.

Example:

df=sqlContext.sql("""select string("{'Attribute1': 'A', 'Attribute 2': 1.5, 'Attribute 3': ['A','B','C'], 'Attribute 4': {'A': 5}}") as str""")

#replacing ' with " using regexp_replace
df=df.withColumn("str",regexp_replace(col("str"),"\'","\""))
df.show(10,False)

#+----------------------------------------------------------------------------------------------+
#|str                                                                                           |
#+----------------------------------------------------------------------------------------------+
#|{"Attribute1": "A", "Attribute 2": 1.5, "Attribute 3": ["A","B","C"], "Attribute 4": {"A": 5}}|
#+----------------------------------------------------------------------------------------------+

#registering temp table
df.registerTempTable("tt")

#accessing Attribute 3
sqlContext.sql("select get_json_object(str,'$.Attribute 3') from tt").show()
#+-------------+
#|          _c0|
#+-------------+
#|["A","B","C"]|
#+-------------+

#accessing first array element from Attribute 3
sqlContext.sql("select get_json_object(str,'$.Attribute 3[0]') from tt").show()
#+---+
#|_c0|
#+---+
#|  A|
#+---+

#accessing Attribute 2
sqlContext.sql("select get_json_object(str,'$.Attribute 2') from tt").show()
#+---+
#|_c0|
#+---+
#|1.5|
#+---+
...