PySpark: создать диктат из данных? - PullRequest
0 голосов
/ 26 декабря 2018

У меня есть данные в следующем формате, которые получены из Hive в фрейм данных:

date, stock, price
1388534400, GOOG, 50
1388534400, FB, 60
1388534400, MSFT, 55
1388620800, GOOG, 52
1388620800, FB, 61
1388620800, MSFT, 55

Где дата - это эпоха полуночи в этом дне, и у нас есть данные, возвращающиеся примерно на 10 лет назад(800 миллионов + рядов).Я хотел бы получить словарь следующим образом:

{
'GOOG':
{
'1388534400': 50,
'1388620800': 52
}

'FB':
{
'1388534400': 60,
'1388620800': 61
}
}

Наивным способом было бы получить список уникальных акций, а затем получить подмножество фрейма данных, отфильтровывая только эти строки для каждой акции, ноэто кажется чрезмерно наивным и ужасно неэффективным.Можно ли это легко сделать в Spark?В настоящее время он работает на нативном Python с использованием PyHive, но из-за большого объема данных я предпочел бы сделать это на кластере / Spark.

Ответы [ 2 ]

0 голосов
/ 26 декабря 2018

В версии 2.4 можно использовать map_from_arrays для построения карт даты-значения при агрегировании значений для каждой акции.Тогда это просто вопрос использования create_map, чтобы использовать символ тикера в качестве ключа.В этом примере используется ChainMap из Python 3.4 для построения окончательной структуры dict, как вы описали.

import json
from collections import ChainMap
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("example") \
    .getOrCreate()

df = spark.createDataFrame([
    (1388534400, "GOOG", 50),
    (1388534400, "FB", 60),
    (1388534400, "MSFT", 55),
    (1388620800, "GOOG", 52),
    (1388620800, "FB", 61),
    (1388620800, "MSFT", 55)]
).toDF("date", "stock", "price")

out = df.groupBy("stock") \
        .agg(
            map_from_arrays(
                collect_list("date"), collect_list("price")).alias("values")) \
        .select(create_map("stock", "values").alias("values")) \
        .rdd.flatMap(lambda x: x) \
        .collect()

print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))

Что дает:

{                                                                               
    "FB": {
        "1388534400": 60,
        "1388620800": 61
    },
    "GOOG": {
        "1388534400": 50,
        "1388620800": 52
    },
    "MSFT": {
        "1388534400": 55,
        "1388620800": 55
    }
}

Однако , как выскажем, у вас есть много данных, которые вы, вероятно, на самом деле не хотите создавать в словаре, так что, вероятно, вам лучше разделить их и записать одну и ту же структуру словаря в файлы для разных разделов.

Давайтесделайте это, укоротив даты до данного месяца и написав отдельный файл для каждого месяца и для каждой акции:

out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \
        .agg(
            map_from_arrays(
                collect_list("date"), collect_list("price")).alias("values")) \
        .select("month", "stock", create_map("stock", "values").alias("values"))

out.write.partitionBy("month", "stock").format("json").save("out/prices")

Это даст вам такую ​​структуру:

out
└── prices
    ├── _SUCCESS
    └── month=2014-01-01
        ├── stock=FB
        │   └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
        ├── stock=GOOG
        │   └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
        └── stock=MSFT
            └── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json

ИMSFT-файл выглядит следующим образом:

{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}

Хотя имя столбца «значения» может отсутствовать в структуре словаря, я надеюсь, что это иллюстрирует то, что вы можете сделать.

0 голосов
/ 26 декабря 2018

Я использую Spark 2.3.1 Это PySpark версия -

from pyspark.sql.functions import udf,collect_list,create_map
from pyspark.sql.types import MapType,IntegerType,StringType

myValues = [('1388534400', 'GOOG', 50), ('1388534400', 'FB', 60), ('1388534400', 'MSFT', 55), ('1388620800', 'GOOG', 52),
('1388620800', 'FB', 61), ('1388620800', 'MSFT', 55)]
df = sqlContext.createDataFrame(myValues,['date','stock','price'])
df.show()
+----------+-----+-----+
|      date|stock|price|
+----------+-----+-----+
|1388534400| GOOG|   50|
|1388534400|   FB|   60|
|1388534400| MSFT|   55|
|1388620800| GOOG|   52|
|1388620800|   FB|   61|
|1388620800| MSFT|   55|
+----------+-----+-----+

combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(StringType(),IntegerType()))

combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(StringType(),MapType(StringType(),IntegerType())))

mapdf = df.groupBy('stock')\
.agg(collect_list(create_map('date','price')).alias('maps'))\
.agg(combineDeepMap(collect_list(create_map('stock',combineMap('maps')))))

new_dict= mapdf.collect()[0][0]
print(new_dict)
   {u'GOOG': {u'1388620800': 52, u'1388534400': 50}, u'FB': {u'1388620800': 61, u'1388534400': 60}, u'MSFT': {u'1388620800': 55, u'1388534400': 55}}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...