Spark: запуск в обратном направлении по P-значению с линейными регрессиями - PullRequest
0 голосов
/ 05 января 2020

В настоящее время у меня есть Spark Dataframe с 2 столбцами: 1) столбец, в котором каждая строка содержит вектор прогнозирующих элементов 2) столбец, содержащий прогнозируемое значение.

Чтобы определить наиболее прогнозируемые функции для использовать в более поздней модели, я использую обратное исключение по P-значению, как обрисовано в общих чертах этой статьей . Ниже приведен мой код:

num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
for i in range(0, num_vars):
    model = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
    model = model.fit(scoresDf)
    p_values = model.summary.pValues
    max_p = np.max(p_values)
    if max_p > 0.05:
        max_index = p_values.index(max_p)
        drop_max_index_udf = udf(lambda elem, drop_index, var_count:
                                 Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
        scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
                                                                               lit(max_index), lit(num_vars)))
        num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()

Код выполняется, но единственная проблема заключается в том, что каждая итерация занимает значительно больше времени, чем предыдущая. На основании ответа на этот вопрос представляется, что код каждый раз переоценивает все предыдущие итерации.

В идеале я хотел бы передать всю логику c в некоторую структуру конвейера, которая будет хранить все это лениво, а затем последовательно выполнять без повторов при вызове, но я не уверен, является ли это даже возможно, учитывая, что ни одна из функций оценки / преобразования Spark, по-видимому, не подходит для этого варианта использования.

Любое руководство будет оценено, спасибо!

Ответы [ 2 ]

0 голосов
/ 06 января 2020

почему вы делаете

model = model.fit(scoresDf)

, когда scoredDfs содержит ваш новый df с одной менее независимой переменной?


Если вы измените свой код следующим образом:

independent_vars = ['x0', 'x1', 'x2', 'x3', 'x4']

def remove_element(array, index):
    return Vectors.dense(np.delete(array, index, 0))

remove_element_udf = udf(lambda a, i: remove_element(a, i), VectorUDT())

max_p = 1
i = 0
while (max_p > 0.05):
    model = LinearRegression(featuresCol="filtered_features", 
                             labelCol="averageScore",
                             fitIntercept=False)
    model = model.fit(scoresDf)

    print('iteration: ', i)
    summary = model.summary
    summary_df = pd.DataFrame({
        'var': independent_vars,
        'coeff': model.coefficients,
        'se': summary.coefficientStandardErrors,
        'p_value': summary.pValues
    })
    print(summary_df)
    print("r2: %f" % summary.r2)    

    p_values = summary.pValues
    max_p = np.max(p_values)
    if max_p > 0.05:
        max_index = p_values.index(max_p)
        max_var = independent_vars[max_index]
        print('-> max_index {max_index}, corresponding to var {var}'.format(max_index=max_index, var=max_var))
        scoresDf = scoresDf.withColumn("filtered_features", remove_element_udf(scoresDf["filtered_features"],
                                                                               lit(max_index)))
        independent_vars = np.delete(independent_vars, max_index, 0)

    print()
    i += 1

вы получите

iteration:  0
  var     coeff        se   p_value
0  x0  0.174697  0.207794  0.402616
1  x1 -0.448982  0.203421  0.029712
2  x2 -0.452940  0.233972  0.055856
3  x3 -3.213578  0.209935  0.000000
4  x4  3.790730  0.212917  0.000000
r2: 0.870330
-> max_index 0, corresponding to var x0

iteration:  1
  var     coeff        se   p_value
0  x1 -0.431835  0.202087  0.035150
1  x2 -0.460711  0.233432  0.051297
2  x3 -3.218725  0.209525  0.000000
3  x4  3.768661  0.210970  0.000000
r2: 0.869365
-> max_index 1, corresponding to var x2

iteration:  2
  var     coeff        se   p_value
0  x1 -0.479803  0.203592  0.020449
1  x3 -3.344830  0.202501  0.000000
2  x4  3.669419  0.207925  0.000000
r2: 0.864065

в первой и второй итерации, две независимые переменные с p-значением больше 0,05 удаляются

0 голосов
/ 06 января 2020

Вы постоянно создаете модель внутри al oop. Это трудоемкий процесс, который необходимо выполнять один раз для набора обучающих данных и набора параметров. Попробуйте следующее -


num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
modelAlgo = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
model = modelAlgo.fit(scoresDf)

for i in range(0, num_vars):

    p_values = model.summary.pValues
    max_p = np.max(p_values)
    if max_p > 0.05:
        max_index = p_values.index(max_p)
        drop_max_index_udf = udf(lambda elem, drop_index, var_count:
                                 Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
        scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
                                                                               lit(max_index), lit(num_vars)))
        num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()

Как только вы довольны моделью, сохраните ее. Когда вам нужно оценить свои данные, просто прочитайте эту модель и прогнозируйте ее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...