Запуск регрессии по нескольким столбцам параллельно - PullRequest
0 голосов
/ 19 мая 2018

У меня очень широкий массив данных со столбцами меток.Я хочу запустить логистическую регрессию для каждого столбца независимо.Я пытаюсь найти наиболее эффективный способ запустить это параллельно.

+----------+--------+--------+--------+-----+------------+
| features | label1 | label2 | label3 | ... | label30000 |
+----------+--------+--------+--------+-----+------------+

Первоначально я хотел использовать ThreadPoolExecutor, получить результат для каждого столбца и присоединиться:

extract_prob = udf(lambda x: float(x[1]), FloatType())

def lr_for_column(argm):
    col_name = argm[0]
    test_res = argm[1]
    lr = LogisticRegression(featuresCol="features", labelCol=col_name, regParam=0.1)
    lrModel = lr.fit(tfidf)
    res = lrModel.transform(test_tfidf)
    test_res = test_res.join(res.select('id', 'probability'), on="id")
    test_res = test_res.withColumn(col_name, extract_prob('probability')).drop("probability")
    return test_res.select('id', col_name)


with futures.ThreadPoolExecutor(max_workers=100) as executor:
    future_results = [executor.submit(lr_for_column, [colname, test_res]) for colname in list_of_label_columns]
    futures.wait(future_results)
    for future in future_results:
       test_res = test_res.join(future.result(), on="id")

но этот метод не очень эффективен.Есть ли более быстрый способ сделать это?

1 Ответ

0 голосов
/ 19 мая 2018

Принимая во внимание доступные ресурсы, вы ничего не выиграете, используя ThreadPoolExecutor - с 32 ядрами в общей сложности и 200 разделами, которые вы можете обрабатывать только ~ 16% ваших данных одновременно, и этодробь может стать только хуже, если данные растут.

Если вы хотите обучить 30000 моделей и использовать количество итераций по умолчанию (100, вероятно, на практике не так много), ваша программа Spark отправит около 3 000 000 заданий (каждоеитерации, создайте отдельный), и только часть каждого из них может обрабатываться одновременно - это не дает большой надежды на улучшение, если вы не добавите больше ресурсов.

Несмотря на то, что есть некоторые вещи, которые вы можете попробовать:

  • Убедитесь, что окончательные характеристики не нужно пересчитывать.При необходимости запишите данные в постоянное хранилище и загрузите их обратно, а также убедитесь, что данные, передаваемые в модель, кэшируются.
  • Рассмотрите возможность применения некоторого алгоритма уменьшения размерности.Количество функций 300000 не только велико, но и близко к количеству записей (500000).Это не только вычислительно дорого, но и может привести к серьезному переоснащению.
  • Если вы решите уменьшить размеры, рассмотрите выборку, чтобы еще больше уменьшить размер ваших обучающих данных, и, следовательно, уменьшить количество разделов и увеличить общее количество.пропускная способность.

    Если в ваших данных наблюдаются сильные линейные тренды, они должны быть видны даже на небольшой выборке без значительной потери точности.

  • Рассмотрите возможность замены дорогой pyspark.mlАлгоритм с вариантом, который не требует нескольких заданий, например, с использованием некоторой комбинации инструментов из spark-sklearn (можно создать модель ансамбля, установив модель sklearn на каждом разделе).

  • Переподписка ядер.Например, если у вас есть 4 физических ядра / узел, разрешите 8 или 16 учитывать время ожидания ввода-вывода.

...