В версии 2.4 можно использовать map_from_arrays
для построения карт даты-значения при агрегировании значений для каждой акции.Тогда это просто вопрос использования create_map
, чтобы использовать символ тикера в качестве ключа.В этом примере используется ChainMap
из Python 3.4 для построения окончательной структуры dict, как вы описали.
import json
from collections import ChainMap
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("example") \
.getOrCreate()
df = spark.createDataFrame([
(1388534400, "GOOG", 50),
(1388534400, "FB", 60),
(1388534400, "MSFT", 55),
(1388620800, "GOOG", 52),
(1388620800, "FB", 61),
(1388620800, "MSFT", 55)]
).toDF("date", "stock", "price")
out = df.groupBy("stock") \
.agg(
map_from_arrays(
collect_list("date"), collect_list("price")).alias("values")) \
.select(create_map("stock", "values").alias("values")) \
.rdd.flatMap(lambda x: x) \
.collect()
print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))
Что дает:
{
"FB": {
"1388534400": 60,
"1388620800": 61
},
"GOOG": {
"1388534400": 50,
"1388620800": 52
},
"MSFT": {
"1388534400": 55,
"1388620800": 55
}
}
Однако , как выскажем, у вас есть много данных, которые вы, вероятно, на самом деле не хотите создавать в словаре, так что, вероятно, вам лучше разделить их и записать одну и ту же структуру словаря в файлы для разных разделов.
Давайтесделайте это, укоротив даты до данного месяца и написав отдельный файл для каждого месяца и для каждой акции:
out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \
.agg(
map_from_arrays(
collect_list("date"), collect_list("price")).alias("values")) \
.select("month", "stock", create_map("stock", "values").alias("values"))
out.write.partitionBy("month", "stock").format("json").save("out/prices")
Это даст вам такую структуру:
out
└── prices
├── _SUCCESS
└── month=2014-01-01
├── stock=FB
│ └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
├── stock=GOOG
│ └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
└── stock=MSFT
└── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
ИMSFT-файл выглядит следующим образом:
{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}
Хотя имя столбца «значения» может отсутствовать в структуре словаря, я надеюсь, что это иллюстрирует то, что вы можете сделать.