Получить корреляционную матрицу в pyspark с большим набором данных - PullRequest
2 голосов
/ 27 февраля 2020

Я хочу вычислить матрицу корреляции большого набора данных (1M строк). Идея состоит в том, чтобы рассчитать соотношение продаж продукта. Если два продукта имеют одинаковое увеличение / уменьшение продаж по сравнению с прошлым годом, возможно, существует корреляция.

Я уже пробовал сообщения здесь:

Которые все более или менее делают то же самое, но собирают матрицу корреляции обратно в драйвер. Что является проблемой, поскольку большой набор данных делает эту коллекцию ОЗУ интенсивной . Я ищу способ разбить эту проблему на части и использовать распределенные вычисления Spark. Уникальных продуктов 170 тыс., Поэтому задание выполняется 170 тыс. Раз, а комбинаций 29Б.

Моя идея состоит в том, чтобы рассчитать корреляцию столбец за столбцом (перекрестное применение), а затем собрать ее во фрейме данных (или СДР) запускать фильтры поверх него (только с соотношением> 0,8). Но я не знаю, с чего начать.

Набор данных в основном выглядит следующим образом.

d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
     'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
     'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = pd.DataFrame(data=d)

Я транспонирую данные, чтобы в столбцах были указаны годы.

df = df.pivot(index='Product', columns='Year', values='Revenue').fillna(0)

Я вычисляю pct_change, чтобы иметь относительное изменение по сравнению с прошлым годом.

df_diff = df.pct_change(axis=1).replace([np.inf, -np.inf], np.nan).fillna(0)

Year     2010      2011      2012
Product                          
A         0.0  0.100000  0.090909
B         0.0 -0.050000  0.157895
C         0.0  0.066667  0.093750

И мне понадобится соотношение ... С pandas easy

# change structure
df_diff = df_diff.stack().unstack(level=0)
# get correlation
df_diff = df_diff.corr().abs()
# change structure back
df_diff = df_diff.unstack().to_frame(name='value')
df_diff.index = df_diff.index.set_names(['Product_1', 'Product_2'])
df_diff.reset_index(inplace=True)

    Product_1 Product_2     value
0         A         A  1.000000
1         A         B  0.207317
2         A         C  0.933485
3         B         A  0.207317
4         B         B  1.000000
5         B         C  0.544352
6         C         A  0.933485
7         C         B  0.544352
8         C         C  1.000000

Ответы [ 2 ]

0 голосов
/ 05 марта 2020

Я использовал udf и сопоставил его с искрой df. С помощью numOfPartitions вы можете контролировать количество задач, которые генерируются и распределяются по рабочим узлам. В моем примере я использовал 16 узлов с 8 процессорами в каждом и разделил df на 10000 разделов.

import pandas as pd
import numpy as np

d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
     'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
     'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = pd.DataFrame(data=d)

df = df.pivot(index='Product', columns='Year', values='Revenue').fillna(0)

df_diff = df.pct_change(axis=1, limit=1).replace([np.inf, -np.inf], np.nan).fillna(0)
df_diff = df_diff.dropna(how='all')

# pivot columns and rows to have year on rows and product on columns
df_diff_piv = df_diff.stack().unstack(level=0).sort_index()

# bring to spark df
df_diff_spark = spark.createDataFrame(df_diff.reset_index())

# correlate on at least x periods
correlation_min_periods = 1 # I used 10 for a 20 periods dataset

# set num of partitions to parallelize on tasks 
numOfPartitions = 200 #200 is default

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType

schema = StructType(
    [
    StructField("Product_1", StringType()),
    StructField("Product_2", StringType()),
    StructField("corr", StringType()) #cant get it to work on FloatType()
    ]
)

def calculate_correlation(product):
  data = df_diff_piv
  arr = []
  for col in data.columns:
    m1 = product
    m2 = data[col].name
    c = np.absolute(data[product].corr(data[col])) #, min_periods=correlation_min_periods
    arr.append([m1, m2, str(c)]) #cant get it to work on FloatType()
  return arr

#register udf
spark.udf.register("calculate_correlation_udf", calculate_correlation)
calculate_correlation_udf = udf(calculate_correlation, ArrayType(schema))

#apply udf to distinct product
distinct_product = df_diff_spark.select("Product").distinct().repartition(numOfPartitions)
res = distinct_product.select("Product", calculate_correlation_udf("Product").alias("corr_matrix"))

from pyspark.sql.functions import explode

# explode (flatten) array and struct back to dataframe
expl = res.select(explode("corr_matrix").alias("corr_row"))
rowlevel = expl.select("corr_row.Product_1","corr_row.Product_2","corr_row.corr")

# convert string to float
rowlevel = rowlevel.withColumn("corr", rowlevel["corr"].cast(FloatType()))

rowlevel.show()
0 голосов
/ 03 марта 2020

Итак, должно работать следующее (по крайней мере, для игрушечного примера): Мне было бы интересно услышать, как оно масштабируется:

import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# define pyspark df
d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
     'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
     'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = spark.createDataFrame(pd.DataFrame(data=d))

# Define a window over products and calc %-changes over time
win = Window().partitionBy("Product").orderBy("Year")
df = df.withColumn("pct_change", 
      F.col("Revenue")/F.lag(F.col("Revenue")).over(win) - 1
)

# replace nulls with 0
df = df.na.fill(0)

# pivot
df = (df.groupBy("Product")
    .pivot("Year")
    .agg(F.first("pct_change"))
    .orderBy("Product"))

# Get pair-RDD of (product, %-changes) and cross-join
numerical_cols = df.columns[1:]
rdd = df.rdd.map(lambda x: (x['Product'], [x[col] for col in numerical_cols]))
rdd = rdd.cartesian(rdd)

# correlation helper function
def corr(pair):
    (prod1, series1), (prod2, series2) = pair
    corr = pd.Series(series1).corr(pd.Series(series2))
    return (prod1, prod2, float(corr))

# pairwise correlation DF
corr_df = rdd.map(corr).toDF(schema=['Product_1', 'Product_2', 'value'])
corr_df.show(5)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...