Передать строку / объект из скользящего окна в UDF с помощью Spark - PullRequest
0 голосов
/ 11 мая 2018

Я пытаюсь обработать строку / объект со многими столбцами как часть скользящего окна, используя пользовательский UDF. Отдельные столбцы могут быть переданы в пользовательский UDF с использованием collect_list, но, учитывая наличие большого количества столбцов, я бы предпочел передать строку / объект напрямую, чтобы упростить управление кодом (поскольку столбцы будут добавляться / удаляться, и не все столбцы будут всегда требуется для обработки). Ниже приведен упрощенный пример того, что я пытаюсь сделать.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,udf}
import org.apache.spark.sql.Row

case class Foo(a:Int, b:String )
case class FooResult(a1:Int, b1:String, a2:Int, b2:String, a3:Int, b3:String, computation1:Int, computation2:String  )

object Main extends App {

    val ss = SparkSession.builder()
      .appName("DataSet Test")
        .master("local[*]").getOrCreate()

    import ss.implicits._

    val foos = Seq(Foo(-18, "Z"),
        Foo(-11, "G"),
        Foo(-8, "A"),
        Foo(-4, "C"),
        Foo(-1,"F")).toDS()

    // work on 3 rows
    val sliding_window_spec = Window.orderBy(desc("a")).rowsBetween( -2, 0)

    // ????
    // how to pass in whole object to a custom UDF
    // tried $"root", $""
    // ????
    foos.select(slidingUdf(  $"*".over(sliding_window_spec)).as("test"))
      .filter(col("test").isNotNull)
      .select(col("test.*"))
      .show(false)

    ss.stop()

    def slidingUdf = udf((list1: Seq[Foo] )=> {
                if(list1.size < 3) {
                  null
                }
                else {
                    FooResult(list1(0).a, list1(0).b, list1(1).a, list1(1).b, list1(2).a, list1(2).b, list1(0).a + list1(1).a, list1(0).b + list1(1).b + list1(2).b )
                }
            })

}

Как я могу передать строку / объект foo в UDF? Я использую Apache Spark 2.3.0

Ответы [ 2 ]

0 голосов
/ 13 мая 2018

Решение, которое я использую на основе Рамеша Махарджана указателей

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,udf,collect_list}
import org.apache.spark.sql.Row

case class Foo(a:Int, b:String ) {
  def toFooWrapper:FooWrapper = FooWrapper(this,true)
}

// utility class to convert Foo from correctly defined Row
object Foo {
  def fromRow( row:Row ):Foo = {
    Foo(row.getAs[Int]("a"),row.getAs[String]("b"))
  }
}

case class FooWrapper(foo:Foo, wrapper:Boolean)

case class FooResult(a1:Int, b1:String, a2:Int, b2:String, a3:Int, b3:String, computation1:Int, computation2:String )

object Main extends App {

    val ss = SparkSession.builder()
      .appName("DataSet Test")
        .master("local[*]").getOrCreate()

    import ss.implicits._

    val foos = Seq(Foo(-18, "Z"),
        Foo(-11, "G"),
        Foo(-8, "A"),
        Foo(-4, "C"),
        Foo(-1,"F")).toDS()

    // workaround - build wrapper
    val fooWrappers = foos.map( x => x.toFooWrapper )

    // work on 3 rows
    // ordering based on first column, second column, etc within Foo structure
    val sliding_window_spec = Window.orderBy(desc("foo")).rowsBetween(-2, 0)

    // grab foo from fooWrapper
  fooWrappers.select(slidingUdf( collect_list("foo").over(sliding_window_spec) ).as("test"))
    .filter(col("test").isNotNull)
    .select(col("test.*"))
    .show(false)

    ss.stop()

    def slidingUdf = udf( (rows:Seq[Row]) => {
        if(rows.size < 3) {
            null
        }
        else {
            val foos = rows.map( x => Foo.fromRow(x))
            FooResult(foos(0).a, foos(0).b, foos(1).a, foos(1).b, foos(2).a, foos(2).b, foos(0).a + foos(1).a, foos(0).b + foos(1).b + foos(2).b )
        }
    })

}
0 голосов
/ 11 мая 2018

Полагаю, вы пытаетесь достичь того же результата, что и ваш другой вопрос . Ваша логика кажется идеальной, но вы допустили определенные ошибки. Итак, начнем с ваших ошибок

Ошибка 1

, если вы определите набор данных как

val foos = Seq(Foo(-18, "Z"),
  Foo(-11, "G"),
  Foo(-8, "A"),
  Foo(-4, "C"),
  Foo(-1,"F")).toDS()

Вы получите набор данных с схемой как

+---+---+
|a  |b  |
+---+---+
|-18|Z  |
|-11|G  |
|-8 |A  |
|-4 |C  |
|-1 |F  |
+---+---+

root
 |-- a: integer (nullable = false)
 |-- b: string (nullable = true)

Как вы можете видеть, Объект структуры Foo уже разделен на разные столбцы (это не то, что вам нужно)

Ошибка 2

Вы определили вашу udf функцию как

def slidingUdf = udf((list1: Seq[Foo] )=> {...

невозможно использовать пользовательские типы данных во входных аргументах функций udf . Вы можете просто использовать типы данных sql только

Ошибка 3

$"*" не собирает все значения столбцов в диапазоне, который вы определили в оконной функции . Вы должны использовать функцию, либо встроенную, либо пользовательскую

Есть и другие мелкие ошибки .

Теперь давайте исправим ошибки

решено 1

Вы должны определить набор данных как

val foos = Seq(
  (Foo(-18, "Z"), "test"),
  (Foo(-11, "G"), "test"),
  (Foo(-8, "A"), "test"),
  (Foo(-4, "C"), "test"),
  (Foo(-1,"F"), "test"))
  .toDF("foo", "test")

что должно дать вам

+--------+----+
|foo     |test|
+--------+----+
|[-18, Z]|test|
|[-11, G]|test|
|[-8, A] |test|
|[-4, C] |test|
|[-1, F] |test|
+--------+----+

root
 |-- foo: struct (nullable = true)
 |    |-- a: integer (nullable = false)
 |    |-- b: string (nullable = true)
 |-- test: string (nullable = true)

Здесь test столбец равен фиктивный столбец , которым можно пренебречь, но был необходим для создания набора данных

решено 2

рабочая udf функция должна быть такой, как показано ниже

def slidingUdf = udf((list1: Seq[Row] )=> {
  if(list1.size < 3) {
    null
  }
  else {
    FooResult(list1(0).getAs[Int]("a"), list1(0).getAs[String]("b"), list1(1).getAs[Int]("a"), list1(1).getAs[String]("b"), list1(2).getAs[Int]("a"), list1(2).getAs[String]("b"), list1(0).getAs[Int]("a") + list1(1).getAs[Int]("a"), list1(0).getAs[String]("b") + list1(1).getAs[String]("b") + list1(2).getAs[String]("b"))
  }
})

Я использовал Row, поскольку - это тип данных для struct столбцов, а также посмотрите, как данные извлекаются из Row

решено 3

Вы должны использовать collect_list встроенную функцию с window функцией как

foos.select(slidingUdf(collect_list("foo").over(sliding_window_spec)).as("test"))
  .filter(col("test").isNotNull)
  .select(col("test.*"))
  .show(false)

Итак, общий рабочий код

val foos = Seq(
  (Foo(-18, "Z"), "test"),
  (Foo(-11, "G"), "test"),
  (Foo(-8, "A"), "test"),
  (Foo(-4, "C"), "test"),
  (Foo(-1,"F"), "test"))
  .toDF("foo", "test")

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
// work on 3 rows
val sliding_window_spec = Window.orderBy(desc("foo.a")).rowsBetween( -2, 0)

def slidingUdf = udf((list1: Seq[Row] )=> {
  if(list1.size < 3) {
    null
  }
  else {
    FooResult(list1(0).getAs[Int]("a"), list1(0).getAs[String]("b"), list1(1).getAs[Int]("a"), list1(1).getAs[String]("b"), list1(2).getAs[Int]("a"), list1(2).getAs[String]("b"), list1(0).getAs[Int]("a") + list1(1).getAs[Int]("a"), list1(0).getAs[String]("b") + list1(1).getAs[String]("b") + list1(2).getAs[String]("b"))
  }
})

foos.select(slidingUdf(collect_list("foo").over(sliding_window_spec)).as("test"))
  .filter(col("test").isNotNull)
  .select(col("test.*"))
  .show(false)

что должно дать вам

+---+---+---+---+---+---+------------+------------+
|a1 |b1 |a2 |b2 |a3 |b3 |computation1|computation2|
+---+---+---+---+---+---+------------+------------+
|-1 |F  |-4 |C  |-8 |A  |-5          |FCA         |
|-4 |C  |-8 |A  |-11|G  |-12         |CAG         |
|-8 |A  |-11|G  |-18|Z  |-19         |AGZ         |
+---+---+---+---+---+---+------------+------------+

Вы можете поиграть для большего обучения и понимания. Надеюсь, ответ более чем полезен;)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...