Мне нужно выполнить обработку строк данных с сохранением состояния.Для этого мне нужно создать класс bean или case, который моделирует данные, необходимые для обработки с сохранением состояния.Я хотел бы придерживаться других данных в кадре данных для использования после обработки с сохранением состояния без моделирования в классе case.Как это можно сделать?
При обработке без сохранения состояния мы можем как-то оставаться на земле DataFrame, используя UDF, но у нас нет такой опции здесь.
Вот что я пробовал:
package com.example.so
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
case class WibbleState() // just a placeholder
case class Wibble
(
x: String,
y: Int,
data: Row // data I don't want to model in the case class
)
object PartialModelization {
def wibbleStateFlatMapper(k: String,
it: Iterator[Wibble],
state: GroupState[WibbleState]): Iterator[Wibble] = it
def main(args: Array[String]) {
val spark = SparkSession.builder()
.appName("PartialModelization")
.master("local[*]").getOrCreate()
import spark.implicits._
// imagine this is actually a streaming data frame
val input = spark.createDataFrame(List(("a", 1, 0), ("b", 1, 2)))
.toDF("x", "y", "z")
// dont want to model z in the case class
// if that seems pointless imagine there is also z1, z2, z3, etc
// or that z is itself a struct
input.select($"x", $"y", struct("*").as("data"))
.as[Wibble]
.groupByKey(w => w.x)
.flatMapGroupsWithState[WibbleState, Wibble](
OutputMode.Append, GroupStateTimeout.NoTimeout)(wibbleStateFlatMapper)
.select("data.*")
.show()
}
}
Что выдает эту ошибку:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "data")
- root class: "com.example.so.Wibble"
Концептуально вы можете попытаться найти какой-то ключ, который позволяет нам соединиться с выходным фреймом данных с входным, чтобы восстановить атрибут «данные», но это простоС точки зрения производительности и сложности реализации это кажется ужасным решением.(Я бы предпочел просто напечатать всю структуру данных в моем случае классов в этом случае!)