Перебор сгруппированного набора данных в Spark 1.6 - PullRequest
0 голосов
/ 14 февраля 2019

В упорядоченном наборе данных я хочу агрегировать данные до тех пор, пока не будет выполнено условие, но сгруппированы по определенному ключу.

Чтобы задать некоторый контекст для моего вопроса, я упрощаю свою задачу до следующей формулировки проблемы:

В spark мне нужно агрегировать строки, сгруппированные по ключу, когда пользователь перестает «кричать» (второй символ в строке не в верхнем регистре).

Пример набора данных:

ID, text, timestamps

1, "OMG I like bananas", 123
1, "Bananas are the best", 234
1, "MAN I love banana", 1235
2, "ORLY? I'm more into grapes", 123565
2, "BUT I like apples too", 999
2, "unless you count veggies", 9999
2, "THEN don't forget tomatoes", 999999

Ожидаемый результат будет:

1, "OMG I like bananas Bananas are the best"
2, "ORLY? I'm more into grapes BUT I like apples too unless you count veggies"

через groupby и agg. Кажется, я не могу установить условие "остановка, когда найден символ в верхнем регистре".

1 Ответ

0 голосов
/ 21 февраля 2019

Это работает только в Spark 2.1 или выше

Возможно то, что вы хотите сделать, но это может быть очень дорого.

Во-первых, давайте создадим некоторый тестданные.В качестве общего совета, когда вы спрашиваете что-то о Stackoverflow, пожалуйста, предоставьте что-то похожее на это, чтобы у людей было с чего начать.

import spark.sqlContext.implicits._
import org.apache.spark.sql.functions._

val df = List(
    (1,  "OMG I like bananas", 1),
    (1, "Bananas are the best", 2),
    (1, "MAN I love banana", 3),
    (2, "ORLY? I'm more into grapes", 1),
    (2, "BUT I like apples too", 2),
    (2, "unless you count veggies", 3),
    (2, "THEN don't forget tomatoes", 4)
).toDF("ID", "text", "timestamps")

Чтобы привести столбец с собранными текстами в порядок, нам нужно добавитьНовый столбец с использованием оконной функции.

Использование оболочки spark:

scala> val df2 = df.withColumn("coll", collect_list("text").over(Window.partitionBy("id").orderBy("timestamps")))
df2: org.apache.spark.sql.DataFrame = [ID: int, text: string ... 2 more fields]

scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]

scala> x.collect.foreach(println)
[1,WrappedArray(OMG I like bananas, Bananas are the best, MAN I love banana)]
[2,WrappedArray(ORLY? I'm more into grapes, BUT I like apples too, unless you count veggies, THEN don't forget tomatoes)]

Чтобы получить реальный текст, нам может понадобиться UDF.Вот мой (я далеко не эксперт в Scala, так что терпите меня)

import scala.collection.mutable

val aggText: Seq[String] => String = (list: Seq[String]) => {
    def tex(arr: Seq[String], accum: Seq[String]): Seq[String] = arr match {
        case Seq() => accum
        case Seq(single) => accum :+ single
        case Seq(str, xs @_*) => if (str.length >= 2 && !(str.charAt(0).isUpper && str.charAt(1).isUpper))
            tex(Nil, accum :+ str )
        else
            tex(xs, accum :+ str)
    }

    val res = tex(list, Seq())
    res.mkString(" ")
}

val textUDF = udf(aggText(_: mutable.WrappedArray[String]))

Итак, у нас есть информационный фрейм с собранными текстами в правильном порядке и функция Scala (обернутая какUDF).Давайте поделим это вместе:

scala> val x = df2.groupBy("ID").agg(max($"coll").as("texts"))
x: org.apache.spark.sql.DataFrame = [ID: int, texts: array<string>]

scala> val y = x.select($"ID", textUDF($"texts"))
y: org.apache.spark.sql.DataFrame = [ID: int, UDF(texts): string]

scala> y.collect.foreach(println)
[1,OMG I like bananas Bananas are the best]
[2,ORLY? I'm more into grapes BUT I like apples too unless you count veggies]

scala>

Я думаю, что это результат, который вы хотите.

...