Как эффективно построить одну модель ML для каждого раздела в Spark с помощью foreachPartition? - PullRequest
0 голосов
/ 28 октября 2019

Я пытаюсь установить одну модель ML для каждого раздела моего набора данных, и я не знаю, как это сделать в Spark.

Мой набор данных в основном выглядит следующим образом, а разделен наКомпания :

Company | Features | Target

A         xxx        0.9
A         xxx        0.8
A         xxx        1.0
B         xxx        1.2
B         xxx        1.0
B         xxx        0.9
C         xxx        0.7
C         xxx        0.9
C         xxx        0.9

Моя цель состоит в том, чтобы параллельно подготовить по одному регрессору для каждой компании (у меня есть несколько сотен миллионов записей с 100 тыс. Компаний). Моя интуиция заключается в том, что мне нужно использовать foreachPartition, чтобы разделы (т.е. мои компании) обрабатывались параллельно, а каждая модель компании обучалась и сохранялась. Моя основная проблема заключается в том, как работать с типом iterator, который должен использоваться в функции, вызываемой foreachPartition.

Вот как это будет выглядеть:

dd.foreachPartition(

    iterator => {var company_df = operator.toDF()
                 var rg = RandomForestRegressor()
                                 .setLabelCol("target")
                                 .setFeaturesCol("features")
                                 .setNumTrees(10)
                 var model = rg.fit(company_df)
                 model.write.save(company_path)
                 }
)

Насколько я понимаю, попытка преобразовать iterator в dataframe невозможна, поскольку концепция RDD не может существовать сама по себе в выражении foreachPartition.

Я знаю, что вопрос достаточно открыт, но я действительно застрял.

1 Ответ

0 голосов
/ 29 октября 2019

В pyspark вы можете сделать что-то вроде ниже

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)
...