Отображение и уменьшение по ключу в df - PullRequest
1 голос
/ 02 мая 2019

Прежде всего - спасибо, что нашли время, чтобы прочитать мой вопрос.

Я пытаюсь освоить Pyspark, но у меня возникли проблемы с расхождением между RDD и DF в pyspark.

Мои данные: мои данные структурированы следующим образом:

+-------+-------------+-------+------+-----+
|     ID|         date| ticker|Return| Year|
+-------+-------------+-------+----- +-----+
| 215021|2018-04-12   |  XYZ  | 0.21 | 2018|
| 205021|2018-04-13   |  XYZ  | 0.18 | 2018|
   ...       ...         ...    ...   ...
| 5102  |2012-01-14   |  ABC  | 0.21 | 2012|
| 5102  |2012-01-05   |  ABC  |-1.01 | 2012|
  ...       ...         ...    ...   ... 
+-------+-------------+-------+------+-----+ 

По существу - у меня есть df акций и их доходности.Столбцы, которые меня действительно беспокоят: «Возвращение» и «Год».Я хочу посчитать среднюю доходность за год ...

В Python это будет:

df.groupby('Year').sum()

Однако я действительно не понимаю, как это сделать в Pyspark.Вот некоторые из моих мыслительных процессов и кода, которые я до сих пор пробовал ...

  1. Я думаю, мне нужно будет создать пару <key><value> для каждой строки, то есть мой ключбудет <year>, а значение будет <return>.Возможно, используя функцию отображения?Не слишком уверен, но вот что я пытался сделать:

    df.rdd.map(lambda y: (int(y[5]), float(y[4])))
    

    Однако всякий раз, когда я пытаюсь показать результаты, я получаю сообщение об ошибке, означающее, что я даже не уверен, что моя структура этого даже правильна.

  2. Расчет суммы в год - это потребует от меня ReducebyKey(year) ..., так что-то вроде:

    reduceByKey(year)
    

    Однако я получаюошибка

    NameError: name 'year' is not defined
    

Любое понимание этого будет с благодарностью.

1 Ответ

1 голос
/ 02 мая 2019

Лучше всего использовать операции с фреймами данных, поскольку ваши данные уже структурированы ... пример группировки по агрегации с использованием вашего набора данных.

df = spark.createDataFrame([([0.21, 2018]),
                           ([0.18, 2018]),
                           ([0.21, 2012]),
                           ([-1.01, 2012])], ["return", "year"])
df.printSchema()

root
 |-- return: double (nullable = true)
 |-- year: long (nullable = true)

from pyspark.sql.functions import *
df.groupBy("year").agg(avg("return").alias("avg_return")).show()

+----+----------+
|year|avg_return|
+----+----------+
|2012|      -0.4|
|2018|     0.195|
+----+----------+

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

В целом - DF - это высокоуровневый структурированный API Spark, который имеет схему / тип точно так же, как Pandas и R DF, тогда как RDD - это неструктурированный API Spark, который не имеет схемы и является просто необработанными (Python, Scala, Java) объектами типа строки. В основном весь код DF Spark компилируется в RDD, это просто в структурированном табличном формате.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...