Как ускорить VIF (дисперсионный коэффициент инфляции) через искру (pySpark) - PullRequest
0 голосов
/ 25 июня 2019

Мне нужно уменьшить количество измерений из моих данных.Я хочу использовать VIF для этого.Мои данные в настоящее время содержат 800 000+ строк и 300+ столбцов.

Я использую платформу spark на платформе данных в Azure с Python в качестве предпочитаемого языка программирования.Тем не менее, я буду рад решению R, если таковое имеется.

Я использую следующий код, чтобы получить расчеты VIF.Однако, поскольку он работает в цикле for, он не запускается параллельно.Я уже пытался заменить цикл на карту rdd, сохранив диапазон столбцов как rdd и используя лямбда-функцию для параллельного запуска всех линейных моделей.Однако это дало мне ошибку, что вы не можете вызвать экземпляр искры с рабочего узла.

def vif_cal_iter(inputdata,vif_threshold):
  xvar_names = inputdata.columns
  vif_max = vif_threshold + 1
  def vif_cal(inputdata, xvar_names, vif_max, colnum_max, vif_threshold):
    vif_max = vif_threshold
    for i in range(2,len(xvar_names)):
      train_t = inputdata.rdd.map(lambda x: [Vectors.dense(x[3:i]+x[i+2:]), x[i]]).toDF(['features', 'label'])
      lr = LinearRegression(featuresCol = 'features', labelCol = 'label', maxIter=2)
      lr_model = lr.fit(train_t)
      r_sq = lr_model.summary.r2
      vif=1/(1-r_sq)
      if vif_max < vif:
        vif_max = vif
        colnum_max = i
    return vif_max, colnum_max
  while vif_max > 5:
    vif_max, colnum_max = vif_cal(inputdata, xvar_names, vif_max, colnum_max, vif_threshold)
    if vif_max > vif_threshold:
        print("Start of If Block")
        inputdata = inputdata.drop(inputdata[colnum_max])
  else:
    return inputdata

В своем текущем виде выполнение кода занимает много времени, более 20 часов только для 5% данных.Мне нужно выполнить больше данных, чем 5% через функцию VIF, намного меньше времени.

Заранее спасибо за помощь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...