Структурированная группа обработки паромПопулярный сбор коллекций нельзя использовать при частичных объединениях - PullRequest
0 голосов
/ 22 мая 2018
stateDF
.withWatermark("t","1 seconds")
.groupBy(window($"t","1 minutes","1 minutes"),$"hid")
.agg(collect_list("id"))
.writeStream.outputMode("append")
.format("console").trigger(ProcessingTime("1 minutes"))
.start().awaitTermination()

Когда я добавлю 'collect_list', у меня возникнет эта проблема.Но с помощью искрового сердечника это можно сделать.ОШИБКА:

java.lang.RuntimeException: Collect не может использоваться в частичных агрегациях.в scala.sys.package $ .error (package.scala: 27) в ...

java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748) введите код здесь

1 Ответ

0 голосов
/ 23 мая 2018

Благодаря моему исследованию у меня есть два решения этой проблемы.

Methed 1: изменение исходного кода с помощью SPARK-1893 , но я не рекомендую делать это.

Methed 2: Создание пользовательских агрегатных функций (UDAF) для себя. Хотя это хлопотно, но эффективно. Ниже приведен мой код, добро пожаловать, правильно!

class CollectList extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(StructField("id", StringType, nullable = true) :: StructField("state", StringType, nullable = true):: Nil)

  override def bufferSchema: StructType = StructType(StructField("ids", ArrayType(StringType, containsNull = true), nullable = true) :: Nil)

  override def dataType: ArrayType = ArrayType(StringType, containsNull = true)

  override def deterministic: Boolean = false

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = null
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.get(0) == null){
      buffer(0) = Array(input.getString(0) + "_" + input.getString(1))
    }
    else {
      val s = input.getString(0) + "_" + input.getString(1)
      val b = buffer.getAs[Seq[String]](0)
      buffer(0) = b :+ s
    }

  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    if (buffer1.getAs[Seq[String]](0) == null){
      buffer1(0) = buffer2.getAs[Seq[String]](0).distinct
    }
    else {
      buffer1(0) = (buffer1.getAs[Seq[String]](0) ++ buffer2.getAs[Seq[String]](0)).distinct
    }
  }

  override def evaluate(buffer: Row): Any = buffer.getAs[Seq[String]](0)

}
...