Выровнять строки в скользящем окне с помощью Spark - PullRequest
0 голосов
/ 09 мая 2018

Я обрабатываю большое количество строк из базы данных или файла, используя Apache Spark. Часть обработки создает скользящее окно из 3 рядов, где строки должны быть сглажены, и для сглаженных рядов выполняются дополнительные вычисления. Ниже приведен упрощенный пример того, что пытается быть сделано.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.expressions.Window

object Main extends App {

  val ss = SparkSession.builder().appName("DataSet Test")
    .master("local[*]").getOrCreate()

  import ss.implicits._

  case class Foo(a:Int, b:String )

  // rows from database or file
  val foos = Seq(Foo(-18, "Z"),
      Foo(-11, "G"),
      Foo(-8, "A"),
      Foo(-4, "C"),
      Foo(-1, "F")).toDS()

  // work on 3 rows
  val sliding_window_spec = Window.orderBy(desc("a")).rowsBetween( -2, 0)

  // flattened object with example computations
  case class FooResult(a1:Int, b1:String, a2:Int, b2:String, a3:Int, b3:String, computation1:Int, computation2:String  )

  // how to convert foo to fooResult???
  // flatten 3 rows into 1 and do additional computations on flattened rows

  // expected results
  val fooResults = Seq(FooResult( -1, "F", -4, "C", -8, "A", -5, "FCA" ),
    FooResult( -4, "C", -8, "A", -11, "G", -12, "CAG" ),
    FooResult( -8, "A", -11, "G", -18, "Z", -19, "AGZ" )).toDS()

   ss.stop()

}

Как я могу преобразовать foos в fooResults? Я использую Apache Spark 2.3.0

1 Ответ

0 голосов
/ 09 мая 2018

// how to convert foo to fooResult??? // flatten 3 rows into 1 and do additional computations on flattened rows

Вы можете просто использовать collect_list встроенную функцию , используя window функцию , которую вы уже определили , а затем, определив функцию udf, вы можете выполнить расчетную часть и выравнивающую часть . наконец, вы можете filter и развернуть столбец struct, чтобы получить конечный желаемый результат как

def slidingUdf = udf((list1: Seq[Int], list2:Seq[String])=> {
  if(list1.size < 3) null
  else {
    val zipped = list1.zip(list2)
    FooResult(zipped(0)._1, zipped(0)._2, zipped(1)._1, zipped(1)._2, zipped(2)._1, zipped(2)._2, zipped(0)._1+zipped(1)._1, zipped(0)._2+zipped(1)._2+zipped(2)._2)
  }
})

foos.select(slidingUdf(collect_list("a").over(sliding_window_spec), collect_list("b").over(sliding_window_spec)).as("test"))
    .filter(col("test").isNotNull)
    .select(col("test.*"))
    .show(false)

что должно дать вам

+---+---+---+---+---+---+------------+------------+
|a1 |b1 |a2 |b2 |a3 |b3 |computation1|computation2|
+---+---+---+---+---+---+------------+------------+
|-1 |F  |-4 |C  |-8 |A  |-5          |FCA         |
|-4 |C  |-8 |A  |-11|G  |-12         |CAG         |
|-8 |A  |-11|G  |-18|Z  |-19         |AGZ         |
+---+---+---+---+---+---+------------+------------+

Примечание: Помните, что классы падежей должны быть определены вне области текущего сеанса

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...