Применение UDF при объединении двух фреймов данных Spark с интервалами - PullRequest
0 голосов
/ 27 августа 2018

У меня есть фрейм данных с тремя столбцами: id, index и value.

+---+-----+-------------------+
| id|index|              value|
+---+-----+-------------------+
|  A| 1023|0.09938822262205915|
|  A| 1046| 0.3110047630613805|
|  A| 1069| 0.8486710971453512|
+---+-----+-------------------+

root
 |-- id: string (nullable = true)
 |-- index: integer (nullable = false)
 |-- value: double (nullable = false)

Затем у меня есть еще один фрейм данных, который показывает желательные периоды для каждого id:

+---+-----------+---------+
| id|start_index|end_index|
+---+-----------+---------+
|  A|       1069|     1276|
|  B|       2066|     2291|
|  B|       1616|     1841|
|  C|       3716|     3932|
+---+-----------+---------+

root
 |-- id: string (nullable = true)
 |-- start_index: integer (nullable = false)
 |-- end_index: integer (nullable = false)

У меня есть три шаблона, как показано ниже

val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)

Цель состоит в том, чтобы для каждой строки в dfIntervals применить функцию (предположим, что это корреляция), в которой функция получает value столбец из dfRaw и три массива шаблонов и добавляет три столбца к dfIntervals, каждый столбец связан с каждым шаблоном.

Допущения: 1 - Размеры массивов шаблонов составляют ровно 10.

2 - В столбце index столбцов dfRaw

3 - start_index и end_index столбцов в dfIntervals отсутствуют столбцы index, а в столбце dfRaw и при наличиировно 10 рядов между ними.Например, dfRaw.filter($"id" === "A").filter($"index" >= 1069 && $"index" <= 1276).count (первая строка в dfIntervals) дает ровно 10.

Вот код, который генерирует эти кадры данных:

import org.apache.spark.sql.functions._
val mySeed = 1000

/* Defining templates for correlation analysis*/
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)

/* Defining raw data*/
var dfRaw = Seq(
  ("A", (1023 to 1603 by 23).toArray),
  ("B", (341 to 2300 by 25).toArray),
  ("C", (2756 to 3954 by 24).toArray)
).toDF("id", "index")
dfRaw = dfRaw.select($"id", explode($"index") as "index").withColumn("value", rand(seed=mySeed))

/* Defining intervals*/
var dfIntervals = Seq(
  ("A", 1069, 1276),
  ("B", 2066, 2291),
  ("B", 1616, 1841),
  ("C", 3716, 3932)
).toDF("id", "start_index", "end_index")

В результате три столбца добавляются к dfIntervals датафрейм с именами corr_w_template1, corr_w_template2 и corr_w_template3

PS: я не смог найти корреляционную функцию в Scala.Давайте предположим, что такая функция существует (как показано ниже), и мы собираемся сделать из нее udf.

def correlation(arr1: Array[Double], arr2: Array[Double]): Double

1 Ответ

0 голосов
/ 27 августа 2018

ОК.

Давайте определим функцию UDF.

Для целей тестирования, давайте всегда будем возвращать 1.

 val correlation = functions.udf( (values: mutable.WrappedArray[Double], template: mutable.WrappedArray[Double]) => {

     1f
  })

val orderUdf = udf((values: mutable.WrappedArray[Row]) => {
    values.sortBy(r => r.getAs[Int](0)).map(r => r.getAs[Double](1))
  })

Затем давайте объединим ваши 2 фрейма данных с определенными правилами и соберем value в 1 столбец с именем values.Кроме того, примените наш orderUdf

 val df = dfIntervals.join(dfRaw,dfIntervals("id") === dfRaw("id") && dfIntervals("start_index")  <= dfRaw("index") && dfRaw("index") <= dfIntervals("end_index") )
    .groupBy(dfIntervals("id"), dfIntervals("start_index"),  dfIntervals("end_index"))
    .agg(orderUdf(collect_list(struct(dfRaw("index"), dfRaw("value")))).as("values"))

Наконец, примените наш udf и покажите его.

df.withColumn("corr_w_template1",correlation(df("values"), lit(template1)))
    .withColumn("corr_w_template2",correlation(df("values"), lit(template2)))
    .withColumn("corr_w_template3",correlation(df("values"), lit(template3)))
    .show(10)

Это полный пример кода:

import org.apache.spark.sql.functions._
  import scala.collection.JavaConverters._

  val conf = new SparkConf().setAppName("learning").setMaster("local[2]")

  val session = SparkSession.builder().config(conf).getOrCreate()



  val mySeed = 1000

  /* Defining templates for correlation analysis*/
  val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
  val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
  val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)

  val schema1 =  DataTypes.createStructType(Array(
    DataTypes.createStructField("id",DataTypes.StringType,false),
    DataTypes.createStructField("index",DataTypes.createArrayType(DataTypes.IntegerType),false)
  ))

  val schema2 =  DataTypes.createStructType(Array(
    DataTypes.createStructField("id",DataTypes.StringType,false),
    DataTypes.createStructField("start_index",DataTypes.IntegerType,false),
    DataTypes.createStructField("end_index",DataTypes.IntegerType,false)
  ))

  /* Defining raw data*/
  var dfRaw = session.createDataFrame(Seq(
    ("A", (1023 to 1603 by 23).toArray),
    ("B", (341 to 2300 by 25).toArray),
    ("C", (2756 to 3954 by 24).toArray)
  ).map(r => Row(r._1 , r._2)).asJava, schema1)

  dfRaw = dfRaw.select(dfRaw("id"), explode(dfRaw("index")) as "index")
    .withColumn("value", rand(seed=mySeed))

  /* Defining intervals*/
  var dfIntervals =  session.createDataFrame(Seq(
    ("A", 1069, 1276),
    ("B", 2066, 2291),
    ("B", 1616, 1841),
    ("C", 3716, 3932)
  ).map(r => Row(r._1 , r._2,r._3)).asJava, schema2)

  //Define udf

  val correlation = functions.udf( (values: mutable.WrappedArray[Double], template: mutable.WrappedArray[Double]) => {
     1f
  })

  val orderUdf = udf((values: mutable.WrappedArray[Row]) => {
    values.sortBy(r => r.getAs[Int](0)).map(r => r.getAs[Double](1))
  })


  val df = dfIntervals.join(dfRaw,dfIntervals("id") === dfRaw("id") && dfIntervals("start_index")  <= dfRaw("index") && dfRaw("index") <= dfIntervals("end_index") )
    .groupBy(dfIntervals("id"), dfIntervals("start_index"),  dfIntervals("end_index"))
    .agg(orderUdf(collect_list(struct(dfRaw("index"), dfRaw("value")))).as("values"))

  df.withColumn("corr_w_template1",correlation(df("values"), lit(template1)))
    .withColumn("corr_w_template2",correlation(df("values"), lit(template2)))
    .withColumn("corr_w_template3",correlation(df("values"), lit(template3)))
    .show(10,false)
...