Spark Catalyst flatMapGroupsWithState: состояние группы с отсортированной коллекцией - PullRequest
0 голосов
/ 05 ноября 2018

Я пытаюсь создать отсортированную коллекцию в состоянии моих групп и получаю ошибку от катализатора, которая, как мне кажется, касается создания экземпляра по умолчанию для коллекции.

Ниже приведен упрощенный конвейер, демонстрирующий ошибку:

package com.example

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}

import scala.collection.immutable.TreeMap

case class Event
(
  key: String
)

case class KeyState
(
  prop: TreeMap[Long, String]
)

object CatalystIssue {

  def updateState(k: String, vs: Iterator[Event], 
    state: GroupState[KeyState]) : Iterator[Event] = vs

  def main(args: Array[String]) {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("CatalystIssue")
      .getOrCreate()

    import spark.implicits._

    val df = spark.readStream.format("rate")
      .load()
      .select(lit("a").as("key"))
      .as[Event]
      .groupByKey(_.key)
      .flatMapGroupsWithState(OutputMode.Append(),
         GroupStateTimeout.NoTimeout())(updateState)

    val query = df.writeStream.format("console")
      .trigger(Trigger.ProcessingTime("30 seconds")).start()

    query.awaitTermination()
  }
}

Что выдает ошибку:

ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 53, Column 106: No applicable constructor/method found for zero actual parameters; candidates are: "public scala.collection.mutable.Builder scala.collection.generic.SortedMapFactory.newBuilder(scala.math.Ordering)"

Это может быть связано с тем, что Sorted Maps не поддерживаются как тип атрибута dataframe , хотя это не мое намерение, и я бы подумал, что KeyState был бы непрозрачным для запуска, так как вы на самом деле к нему не обращаетесь как атрибут dataframe.

Хотя не очень привлекательным вариантом может быть сериализация отсортированного набора в байтовый массив, который является атрибутом KeyState. т.е.

case class KeyState
(
  prop: Array[Byte]
)

Если бы использовалась Сериализация Java, это сохраняло бы внутреннюю древовидную структуру TreeMap, так, чтобы по крайней мере это не должно было быть восстановлено? Существуют ли альтернативные технологии сериализации, которые бы сохранили структуру?

Кажется полезным иметь возможность сохранять некоторые отсортированные коллекции в состоянии группы, особенно если предположить, что вычисления в основном выполняются в памяти. Есть ли что-то в том, как работает искра, которая делает это принципиально неработоспособным?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...