В настоящее время у меня есть Spark Dataframe с 2 столбцами: 1) столбец, в котором каждая строка содержит вектор прогнозирующих элементов 2) столбец, содержащий прогнозируемое значение.
Чтобы определить наиболее прогнозируемые функции для использовать в более поздней модели, я использую обратное исключение по P-значению, как обрисовано в общих чертах этой статьей . Ниже приведен мой код:
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
for i in range(0, num_vars):
model = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
model = model.fit(scoresDf)
p_values = model.summary.pValues
max_p = np.max(p_values)
if max_p > 0.05:
max_index = p_values.index(max_p)
drop_max_index_udf = udf(lambda elem, drop_index, var_count:
Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
lit(max_index), lit(num_vars)))
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
Код выполняется, но единственная проблема заключается в том, что каждая итерация занимает значительно больше времени, чем предыдущая. На основании ответа на этот вопрос представляется, что код каждый раз переоценивает все предыдущие итерации.
В идеале я хотел бы передать всю логику c в некоторую структуру конвейера, которая будет хранить все это лениво, а затем последовательно выполнять без повторов при вызове, но я не уверен, является ли это даже возможно, учитывая, что ни одна из функций оценки / преобразования Spark, по-видимому, не подходит для этого варианта использования.
Любое руководство будет оценено, спасибо!