Как я могу выполнять операции с группами возможно большого DataFrame в PySpark без сбора данных для драйвера? - PullRequest
0 голосов
/ 13 мая 2019

Я пытаюсь реализовать алгоритм, который я сейчас разрабатываю, используя spark, чтобы, возможно, обрабатывать большие данные и выполнять операции параллельно.

Прямо сейчас я поворачиваю голову к вопросу о том, как выполнить операцию над частямиданные, не собирая их и не перераспределяя их.

Кажется невозможным ни использовать сеанс зажигания из метода карты, ни распространять сгруппированные данные, ни что-либо, что не включает сбор данных в драйвер, который удаляет любые полезныеприложение из-за медленных сериализаций.

Это может показаться тривиальным, но действительно трудно понять, как делать такие вещи быстрым способом.

Вариант использования:

  • У меня есть DataFrame, содержащий обучающие данные для нескольких моделей ML
  • I Присоединяюсь к другой таблице, которой назначается каждый пункт данныходин модельный индекс
  • Я хочу использовать spark для обучения этих моделей параллельно, но только сериализует данные, необходимые для обучения на работнике (не весь фрейм данных).

Какой-то псевдоподобный код для демонстрации того, чего я пытаюсь достичь:

sess = self.get_spark_session()
mapping = sess.createDataFrame(self.assign_data_points_to_model().zipWithIndex(), 
                                                   StructType([StructField('model_idx', IntegerType(), False),
                                                               StructField('index', IntegerType(), False)]))
X_train = self.get_X_train(without_index=False)
X_train_with_model = X_train.join(mapping, on=X_train.index == mapping.index)

X_train_with_model.registerTempTable("train_with_model_assignment")

def train_models(idx_model):
    curr_model_idx, model = idx_model

    # !!!!!!!!!!! This will not work. How to do something that does that?
    data_bc = sess.sql(f"SELECT * from train_with_model_assignment WHERE curr_model_idx = {model_idx}]")

    return model


rdd = sess.parallelize([*zip(range(0, len(self.get_models())), self.get_models())]).map(train_models)
fitted_models = rdd.collect()

Заранее спасибо за чтение этого более длинного поста:)

...