Я пытаюсь реализовать алгоритм, который я сейчас разрабатываю, используя spark, чтобы, возможно, обрабатывать большие данные и выполнять операции параллельно.
Прямо сейчас я поворачиваю голову к вопросу о том, как выполнить операцию над частямиданные, не собирая их и не перераспределяя их.
Кажется невозможным ни использовать сеанс зажигания из метода карты, ни распространять сгруппированные данные, ни что-либо, что не включает сбор данных в драйвер, который удаляет любые полезныеприложение из-за медленных сериализаций.
Это может показаться тривиальным, но действительно трудно понять, как делать такие вещи быстрым способом.
Вариант использования:
- У меня есть DataFrame, содержащий обучающие данные для нескольких моделей ML
- I Присоединяюсь к другой таблице, которой назначается каждый пункт данныходин модельный индекс
- Я хочу использовать spark для обучения этих моделей параллельно, но только сериализует данные, необходимые для обучения на работнике (не весь фрейм данных).
Какой-то псевдоподобный код для демонстрации того, чего я пытаюсь достичь:
sess = self.get_spark_session()
mapping = sess.createDataFrame(self.assign_data_points_to_model().zipWithIndex(),
StructType([StructField('model_idx', IntegerType(), False),
StructField('index', IntegerType(), False)]))
X_train = self.get_X_train(without_index=False)
X_train_with_model = X_train.join(mapping, on=X_train.index == mapping.index)
X_train_with_model.registerTempTable("train_with_model_assignment")
def train_models(idx_model):
curr_model_idx, model = idx_model
# !!!!!!!!!!! This will not work. How to do something that does that?
data_bc = sess.sql(f"SELECT * from train_with_model_assignment WHERE curr_model_idx = {model_idx}]")
return model
rdd = sess.parallelize([*zip(range(0, len(self.get_models())), self.get_models())]).map(train_models)
fitted_models = rdd.collect()
Заранее спасибо за чтение этого более длинного поста:)