Условно преобразовать колонку в искру - PullRequest
1 голос
/ 16 мая 2019

Предположим, у меня есть кадр данных, такой как:

import org.apache.spark.sql.{Row, DataFrame, SparkSession}
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, DoubleType, NumericType}
import org.apache.spark.sql.functions.{udf, col, skewness}

val someData = Seq(
  Row(8, "bat"),
  Row(64, "mouse"),
  Row(-27, "horse"),
  Row(null, "mouse"),
  Row(27, null)
)

val someSchema = List(
  StructField("number", IntegerType, true),
  StructField("word", StringType, true)
)

val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(someSchema)
)

val df = someDF.withColumn("constantColumn", lit(1))

Я хотел бы рассчитать асимметрию каждого столбца, имеющего тип NumericType. Затем, если асимметрия столбца выше определенного порога, я бы хотел преобразовать его через f(x) = log(x + 1). (Я знаю, что выполнение log-преобразований на отрицательных данных даст NaN, но я хотел бы в конечном итоге написать код, который будет учитывать эту возможность).

Что я пробовал до сих пор:

Я нашел способ сделать это, но он требует изменяемого кадра данных df. Из моего ограниченного понимания, это не желательно.

val log1p = scala.math.log1p(_)
val log1pUDF = udf(scala.math.log1p(_: Double))
val transformThreshold = 0.04

// filter those columns which have a type that inherits from NumericType
val numericColumns = df.columns.filter(column => df.select(column).schema(0).dataType.isInstanceOf[NumericType])

// for columns having NumericType, filter those that are sufficiently skewed
val columnsToTransform = numericColumns.filter(numericColumn => df.select(skewness(df(numericColumn))).head.getDouble(0) > transformThreshold)

// for all columns that are sufficiently skewed, perform log1p transform and add it to df 
for(column <- columnsToTransform) {

   // df should be mutable here!
   df = df.withColumn(column + "_log1p", log1pUDF(df(column))) 
}

Мои вопросы:

  • Как мне достичь цели без использования изменяемых кадров данных?
  • Есть ли более простой / быстрый способ добиться того, что я пытался сделать?

(Запуск на Spark 2.4.0, Scala 2.11.12.)

1 Ответ

4 голосов
/ 16 мая 2019

Вместо структуры for() вы можете использовать рекурсивную функцию:

def rec(df: DataFrame, columns: List[String]): DataFrame = columns match {
  case Nil => df
  case h :: xs => rec(df.withColumn(s"${h}_log1p", log1pUDF(col(h))), xs)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...