Как передать данные через модуль отображения искры, не моделируя их в классе параметров? - PullRequest
0 голосов
/ 24 декабря 2018

Мне нужно выполнить обработку строк данных с сохранением состояния.Для этого мне нужно создать класс bean или case, который моделирует данные, необходимые для обработки с сохранением состояния.Я хотел бы придерживаться других данных в кадре данных для использования после обработки с сохранением состояния без моделирования в классе case.Как это можно сделать?

При обработке без сохранения состояния мы можем как-то оставаться на земле DataFrame, используя UDF, но у нас нет такой опции здесь.

Вот что я пробовал:

package com.example.so

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}

case class WibbleState() // just a placeholder

case class Wibble
(
  x: String,
  y: Int,
  data: Row // data I don't want to model in the case class
)

object PartialModelization {

  def wibbleStateFlatMapper(k: String,
                            it: Iterator[Wibble],
                            state: GroupState[WibbleState]): Iterator[Wibble] = it

  def main(args: Array[String]) {
    val spark = SparkSession.builder()
      .appName("PartialModelization")
      .master("local[*]").getOrCreate()

    import spark.implicits._

    // imagine this is actually a streaming data frame
    val input = spark.createDataFrame(List(("a", 1, 0), ("b", 1, 2)))
      .toDF("x", "y", "z")
    // dont want to model z in the case class
    // if that seems pointless imagine there is also z1, z2, z3, etc
    // or that z is itself a struct

    input.select($"x", $"y", struct("*").as("data"))
      .as[Wibble]
      .groupByKey(w => w.x)
      .flatMapGroupsWithState[WibbleState, Wibble](
        OutputMode.Append, GroupStateTimeout.NoTimeout)(wibbleStateFlatMapper)
      .select("data.*")
      .show()

  }

}

Что выдает эту ошибку:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "data")
- root class: "com.example.so.Wibble"

Концептуально вы можете попытаться найти какой-то ключ, который позволяет нам соединиться с выходным фреймом данных с входным, чтобы восстановить атрибут «данные», но это простоС точки зрения производительности и сложности реализации это кажется ужасным решением.(Я бы предпочел просто напечатать всю структуру данных в моем случае классов в этом случае!)

1 Ответ

0 голосов
/ 26 декабря 2018

Лучшее решение, которое я нашел на данный момент, - это использование кортежа для разделения данных сопоставления и данных строк.

Поэтому мы удаляем атрибут данных из Wibble.

case class Wibble
(
  x: String,
  y: Int
)

Измените типы в нашем плоском сопоставителе с состоянием для обработки (Wibble, Row) вместо просто Wibble:

def wibbleStateFlatMapper(k: String,
                          it: Iterator[(Wibble, Row)],
                          state: GroupState[WibbleState]): Iterator[(Wibble, Row)] = it

Теперь наш код конвейера становится:

// imagine this is actually a streaming data frame
val input = spark.createDataFrame(List(("a", 1, 0), ("b", 1, 2)))
  .toDF("x", "y", "z")

val inputEncoder = RowEncoder(input.schema)
val wibbleEncoder = Encoders.product[Wibble]
implicit val tupleEncoder = Encoders.tuple(wibbleEncoder, inputEncoder)

input.select(struct($"x", $"y").as("wibble"), struct("*").as("data"))
  .as(tupleEncoder)
  .groupByKey({case (w,_) => w.x})
  .flatMapGroupsWithState(
    OutputMode.Append, GroupStateTimeout.NoTimeout)(wibbleStateFlatMapper)
  .select("_2.*")
  .show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...