Рассчитать автокорреляцию в писпарке - PullRequest
0 голосов
/ 17 марта 2020

В настоящее время я переносю свои скрипты из pandas в pyspark. Я хочу рассчитать автокорреляцию доходности для каждой акции в каждый день. Мои данные выглядят так:

+-----+--------+-------+----------+----------+
|stock| date   | hour  |  minute  |  return  |
+-----+--------+-------+----------+----------+
 VOD  | 01-02  |  10   |   13     |  0.05    |
 VOD  | 01-02  |  10   |   14     |  0.02    |
 VOD  | 01-02  |  10   |   16     | -0.02    | 
 VOD  | 01-02  |  11   |   13     |  0.05    |
 VOD  | 01-02  |  12   |   03     |  0.02    |
 VOD  | 01-02  |  13   |   45     | -0.02    | 
 ...     ...      ...     ....       ... 
 ABC  | 01-02  |  11   |   13     |  0.01    |
 ABC  | 01-02  |  11   |   14     |  0.02    |
 ABC  | 01-02  |  11   |   15     |  0.03    | 

Требуемый вывод должен выглядеть примерно так:

+-----+--------+-------+
|stock| date   | auto  | 
+-----+--------+-------+
 VOD  | 01-02  | 0.04  |  
 VOD  | 01-03  | 0.07  | 
 VOD  | 01-04  | 0.01  | 
 VOD  | 01-05  | 0.05  | 

Это очень просто сделать в pandas

df_auto=df.groupby[('stock','date')]['return'].apply(pd.Series.autocorr,lag=1).reset_index(name='auto')

Однако, Может кто-нибудь дать мне знать, как получить автокорреляцию фактора в pyspark? Благодаря.

1 Ответ

1 голос
/ 17 марта 2020

sample data:

df.show()

+-----+-----+----+------+------+
|stock| date|hour|minute|return|
+-----+-----+----+------+------+
|  VOD|01-02|  10|    13|  0.05|
|  VOD|01-02|  10|    14|  0.02|
|  VOD|01-02|  10|    16| -0.02|
|  VOD|01-02|  11|    13|  0.05|
|  VOD|01-02|  12|     3|  0.02|
|  VOD|01-02|  13|    45| -0.02|
+-----+-----+----+------+------+

Используйте groupby с collectist и примените udf в собранном списке. В искре нет функции автокорра, поэтому мы должны использовать панд / серию:

from pyspark.sql import functions as F

def autocorr(ret):
  import pandas as pd
  s = pd.Series(ret)
  return float(s.autocorr(lag=1))

auto=F.udf(autocorr, FloatType())

df.groupBy("stock","date").agg(F.collect_list(F.col("return")).alias("return")).withColumn("auto", auto("return")).select("stock","date","auto").show(truncate=False)


+-----+-----+-----------+
|stock|date |auto       |
+-----+-----+-----------+
|VOD  |01-02|-0.28925422|
+-----+-----+-----------+
...