Это работает только в Spark 2.1 или выше
Возможно то, что вы хотите сделать, но это может быть очень дорого.
Во-первых, давайте создадим некоторый тестданные.В качестве общего совета, когда вы спрашиваете что-то о Stackoverflow, пожалуйста, предоставьте что-то похожее на это, чтобы у людей было с чего начать.
import spark.sqlContext.implicits._
import org.apache.spark.sql.functions._
val df = List(
(1, "OMG I like bananas", 1),
(1, "Bananas are the best", 2),
(1, "MAN I love banana", 3),
(2, "ORLY? I'm more into grapes", 1),
(2, "BUT I like apples too", 2),
(2, "unless you count veggies", 3),
(2, "THEN don't forget tomatoes", 4)
).toDF("ID", "text", "timestamps")
Чтобы привести столбец с собранными текстами в порядок, нам нужно добавитьНовый столбец с использованием оконной функции.
Использование оболочки spark:
scala> val df2 = df.withColumn("coll", collect_list("text").over(Window.partitionBy("id").orderBy("timestamps")))
df2: org.apache.spark.sql.DataFrame = [ID: int, text: string ... 2 more fields]
scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]
scala> x.collect.foreach(println)
[1,WrappedArray(OMG I like bananas, Bananas are the best, MAN I love banana)]
[2,WrappedArray(ORLY? I'm more into grapes, BUT I like apples too, unless you count veggies, THEN don't forget tomatoes)]
Чтобы получить реальный текст, нам может понадобиться UDF.Вот мой (я далеко не эксперт в Scala, так что терпите меня)
import scala.collection.mutable
val aggText: Seq[String] => String = (list: Seq[String]) => {
def tex(arr: Seq[String], accum: Seq[String]): Seq[String] = arr match {
case Seq() => accum
case Seq(single) => accum :+ single
case Seq(str, xs @_*) => if (str.length >= 2 && !(str.charAt(0).isUpper && str.charAt(1).isUpper))
tex(Nil, accum :+ str )
else
tex(xs, accum :+ str)
}
val res = tex(list, Seq())
res.mkString(" ")
}
val textUDF = udf(aggText(_: mutable.WrappedArray[String]))
Итак, у нас есть информационный фрейм с собранными текстами в правильном порядке и функция Scala (обернутая какUDF).Давайте поделим это вместе:
scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]
scala> val y = x.select($"ID", textUDF($"texts"))
y: org.apache.spark.sql.DataFrame = [ID: int, UDF(texts): string]
scala> y.collect.foreach(println)
[1,OMG I like bananas Bananas are the best]
[2,ORLY? I'm more into grapes BUT I like apples too unless you count veggies]
scala>
Я думаю, что это результат, который вы хотите.