Можно ли использовать pyspark для ускорения регрессионного анализа для каждого столбца очень большого размера массива? - PullRequest
1 голос
/ 18 июня 2019

У меня есть массив очень большого размера.Я хочу сделать линейную регрессию на каждом столбце массива.Чтобы ускорить вычисления, я создал список с каждым столбцом массива в качестве его элемента.Затем я использовал pyspark для создания RDD и затем применил к нему определенную функцию.У меня были проблемы с памятью при создании этого RDD (то есть распараллеливание).

Я пытался улучшить spark.driver.memory до 50g, установив spark-defaults.conf, но программа все еще кажется мертвой.

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
from pyspark import SparkContext
sc = SparkContext("local", "get Linear Coefficients")

def getLinearCoefficients(column):
    y=column[~np.isnan(column)] # Extract column non-nan values
    x=np.where(~np.isnan(column))[0]+1 # Extract corresponding indexs plus 1
    # We only do linear regression interpolation when there are no less than 3 data pairs exist.
    if y.shape[0]>=3:
        model=LinearRegression(fit_intercept=True) # Intilialize linear regression model
        model.fit(x[:,np.newaxis],y) # Fit the model using data
        n=y.shape[0]
        slope=model.coef_[0]
        intercept=model.intercept_
        r2=r2_score(y,model.predict(x[:,np.newaxis]))
        rmse=np.sqrt(mean_squared_error(y,model.predict(x[:,np.newaxis])))
    else:
        n,slope,intercept,r2,rmse=np.nan,np.nan,np.nan,np.nan,np.nan
    return n,slope,intercept,r2,rmse

random_array=np.random.rand(300,2000*2000) # Here we use a random array without missing data for testing purpose.
columns=[col for col in random_array.T]
columnsRDD=sc.parallelize(columns)
columnsLinearRDD=columnsRDD.map(getLinearCoefficients)
n=np.array([e[0] for e in columnsLinearRDD.collect()])
slope=np.array([e[1] for e in columnsLinearRDD.collect()])
intercept=np.array([e[2] for e in columnsLinearRDD.collect()])
r2=np.array([e[3] for e in columnsLinearRDD.collect()])
rmse=np.array([e[4] for e in columnsLinearRDD.collect()])

Вывод программы был неизменным, как показано ниже.

Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:486)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:467)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:315)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:412)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:409)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:409)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:396)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396)
        at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86)
        at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Полагаю, можно использовать pyspark для ускорения вычислений, но как я могу это сделать?Изменение других параметров в spark-defaults.conf?Или векторизовать каждый столбец массива (я знаю, функция range () в Python3 делает это, и это действительно быстрее.)?

Ответы [ 2 ]

1 голос
/ 18 июня 2019

Это не сработает.В основном вы делаете три вещи:

  1. вы используете RDD для распараллеливания,
  2. вы вызываете функцию getLinearCoefficients () и, наконец,
  3. вы вызываете собираем () на нем, чтобы использовать существующий код.

В точке соприкосновения нет ничего плохого, но во втором и третьем шаге есть огромная ошибка.Ваша функция getLinearCoefficients () не имеет преимуществ от pyspark, так как вы используете numpy и sklearn (для более подробного объяснения посмотрите этот post ).Для большинства используемых вами функций есть эквивалент pyspark.Проблема с третьим шагом - это функция collect ().Когда вы вызываете collect (), pyspark переносит все строки RDD в драйвер и выполняет там функции sklearn.Поэтому вы получаете только распараллеливание, которое разрешено sklearn.Использование pyspark совершенно бессмысленно, как вы делаете это в настоящее время, и, возможно, даже является недостатком.Pyspark не является фреймворком, который позволяет вам запускать ваш код Python параллельно.Если вы хотите выполнить свой код параллельно с pyspark, вы должны использовать функции pyspark.

Так что же вы можете сделать?

  • Прежде всего вы можете использовать параметр n_jobs класса LinearRegession, чтобы использовать более одного ядра для своих расчетов.Это позволяет вам по крайней мере использовать все ядра одной машины.
  • Еще одна вещь, которую вы можете сделать, это отойти от sklearn и использовать linearRegression pyspark (взгляните на guide и api ).При этом вы можете использовать целый кластер для вашей линейной регрессии.
0 голосов
/ 18 июня 2019

Для больших наборов данных с более чем 100 тыс. Выборок использование LinearRegression не рекомендуется.Общий совет состоит в том, чтобы использовать SGDRegressor и правильно настроить параметры, чтобы использовалась потеря OLS:

from sklearn.linear_model import SGDRegressor

И замените LinearRegression на:

model = SGDRegressor(loss=’squared_loss’, penalty=’none’, fit_intercept=True)

Настройкаloss=’squared_loss’ и penalty=’none’ устанавливает SGDRegressor для использования OLS и без регуляризации, поэтому он должен давать результаты, аналогичные LinearRegression.

Попробуйте некоторые параметры, такие как learning_rate и eta0 / power_t, чтобы найти оптимальное в производительности.

Кроме того, я рекомендую использовать train_test_split, чтобы разбить набор данных и использовать набор тестов для оценки.Хороший размер теста для начала - test_size=.3.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...