Spark ML: Денормализация данных - PullRequest
0 голосов
/ 10 мая 2018

Мне нужно отменить нормализацию данных, которые были нормализованы с помощью метода ML MinMaxScaler в Spark.

Мне удалось нормализовать мои данные, выполнив следующие действия: Spark: преобразовать RDD [LabeledPoint] в Dataframe, чтобы применить MinMaxScaler, и после масштабирования получить нормализованный RDD [LabeledPoint] , который яопубликовал ранее.

Например, у оригинала df были два первых столбца, и после масштабирования результат был:

+------+--------------------+--------------------+
|labels|            features|      featuresScaled|
+------+--------------------+--------------------+
|   1.0|[6.0,7.0,42.0,1.1...|[1.0,0.2142857142...|
|   1.0|[6.0,18.0,108.0,3...|[1.0,1.0,1.0,1.0,...|
|   1.0|[5.0,7.0,35.0,1.4...|[0.0,0.2142857142...|
|   1.0|[5.0,8.0,40.0,1.6...|[0.0,0.2857142857...|
|   1.0|[6.0,4.0,24.0,0.6...|[1.0,0.0,0.0,0.0,...|
+------+--------------------+--------------------+

Проблема в том, что теперь мне нужновыполните противоположный процесс: отмените нормализацию.

Для этого мне нужны значения min и max для каждого столбца объектов в векторе features и значения, которые должны быть денормализованы.

Чтобы получить min и max, я прошу MinMaxScaler следующим образом:

val df_fitted = scaler.fit(df_all)
val df_fitted_original_min = df_fited.originalMin   // Vector
val df_fitted_original_max = df_fited.originalMax   // Vector

df_fited_original_min[1.0,1.0,7.0,0.007,0.052,0.062,1.0,1.0,7.0,1.0]
df_fited_original_max[804.0,553.0,143993.0,537.0,1.0,1.0,4955.0,28093.0,42821.0,3212.0]

И, с другой стороны, у меня есть DataFrame как это:

+--------------------+-----+--------------------+--------------------+-----+-----+--------------------+--------------------+--------------------+-----+
|               col_0|col_1|               col_2|               col_3|col_4|col_5|               col_6|               col_7|               col_8|col_9|
+--------------------+-----+--------------------+--------------------+-----+-----+--------------------+--------------------+--------------------+-----+
|0.009069428120139292|  0.0|9.015488712438252E-6|2.150418860440459E-4|  1.0|  1.0|0.001470074844665...|2.205824685144127...|2.780971210319238...|  0.0|
|0.008070826019024355|  0.0|3.379696051366339...|2.389342641479033...|  1.0|  1.0|0.001308210192425627|1.962949264985630...|1.042521123176856...|  0.0|
|0.009774715414895803|  0.0|1.299590589291292...|1.981673063697640...|  1.0|  1.0|0.001584395736407...|2.377361424206848...| 4.00879434193585E-5|  0.0|
|0.009631155146285946|  0.0|1.218569739510422...|2.016021040879828E-4|  1.0|  1.0|0.001561125874539...|2.342445354515269...|3.758872615157643E-5|  0.0|

Теперь мне нужно применить следующее уравнение, чтобы получить новые значения, но я не знаю, как мне это сделать.

X_original = ( X_scaled * (max - min) ) + min

Для каждой позиции в DF я должен применить это уравнение с соответствующими значениями max и min в векторе.

Например: в первой строке и столбце DF стоит 0.009069428120139292.В том же столбце соответствующие значения min и max: 1.0 и 804.0.Итак, денормализованное значение:

X_den = ( 0.009069428120139292 * (804.0 - 1.0) ) + 1.0

Необходимо уточнить, что ДФ, который был нормализован в первую очередь, во время программы был изменен.В связи с этим мне нужно применить денормализацию (если нет, самый простой способ - сохранить копию оригинального DF).

Ответы [ 2 ]

0 голосов
/ 15 мая 2018

Я получил ответ от следующего https://stackoverflow.com/a/50314767/9759150, плюс небольшая адаптация к моей проблеме. Я завершил процесс нормализации.

Давайте рассмотрим normalized_df как фрейм данных с 10 столбцами (показано в моем вопросе):

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val updateFunction = (columnValue: Column, minValue: Int, maxValue: Int) =>
    (columnValue * ( lit(maxValue) - lit(minValue))) + lit(minValue)

val updateColumns = (df: DataFrame, minVector: Vector, maxVector: Vector, updateFunction: (Column, Int, Int) => Column) => {
    val columns = df.columns
    minVector.toArray.zipWithIndex.map{
      case (updateValue, index) =>
        updateFunction( col(columns(index.toInt)), minVector(index).toInt, maxVector(index).toInt ).as(columns(index.toInt))
    }
}

var dfUpdated = normalized_df.select(
  updateColumns(normalized_df, df_fitted_original_min, df_fitted_original_max, updateFunction) :_*
)
0 голосов
/ 11 мая 2018

Вы «просто» применяете обратные операции в обратном порядке.Уравнение находится в документации здесь .Интересующий код:

X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
X_scaled = X_std * (max - min) + min

Теперь у вас есть набор данных X_saled значений, и вы хотите восстановить исходные значения X.Ваша непосредственная проблема в том, что вы теряете некоторую базовую информацию в процессе преобразования.X_scaled - набор данных в диапазоне [0, 1];у вас нет возможности узнать, что было в исходном диапазоне.

Чтобы это сработало, найдите и сохраните исходные значения min и `max.Теперь легко отменить линейное преобразование для каждого элемента:

X_original = X_scaled * (max - min) + min

Можете ли вы взять его оттуда?

...