У вас есть как минимум 3 варианта:
Вариант 1:
Вам не нужно использовать дополнительные библиотеки, такие как fastparquet
, так как Spark обеспечивает это функциональность уже:
pdf = pd.DataFrame({
"foo": [1, 2, 3],
"bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})
df = spark.createDataFrame(pdf)
df.write.mode("overwrite").parquet("/tmp/parquet1")
Если попытаться загрузить данные с помощью df = spark.read.parquet("/tmp/parquet1")
, схема будет иметь вид:
StructType([
StructField("foo", LongType(), True),
StructField("bar",MapType(StringType(), LongType(), True), True)])
Как вы можете видеть в этом случае, Spark сохранит правильную схему.
Вариант 2:
Если по какой-либо причине по-прежнему необходимо использовать fastparquet
, тогда bar
будет считаться строкой, поэтому вы можете загрузить bar
как и затем преобразовать его в JSON, используя функцию from_ json. В вашем случае мы будем обрабатывать json как словарь Map (string, int). Это для нашего собственного удобства, так как данные представляются последовательностью ключ / значение, которая может быть идеально представлена словарем:
from pyspark.sql.types import StringType, MapType,LongType
from pyspark.sql.functions import from_json
df = spark.read.parquet("/tmp/parquet1")
# schema should be a Map(string, string)
df.withColumn("bar", from_json("bar", MapType(StringType(), LongType()))).show()
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 1|[a -> 1, b -> 10]|
# | 2|[a -> 2, b -> 20]|
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+
Опция 3:
Если ваша схема не изменяется и вы знаете, что каждое значение бара всегда будет иметь одинаковую комбинацию полей (a, b), вы также можете преобразовать bar
в структуру:
schema = StructType([
StructField("a", LongType(), True),
StructField("b", LongType(), True)
])
df = df.withColumn("bar", from_json("bar", schema))
df.printSchema()
# root
# |-- foo: long (nullable = true)
# |-- bar: struct (nullable = true)
# | |-- a: long (nullable = true)
# | |-- b: long (nullable = true)
Пример:
Затем вы можете запустить свой код с помощью:
df.registerTempTable('my_toy_table')
spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 20").show()
# or spark.sql("SELECT * FROM my_toy_table WHERE bar['b'] > 20")
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+