Преобразование DataFrame в набор данных (scala) - PullRequest
1 голос
/ 23 марта 2019

Я пытаюсь распаковать значения сообщения Kafka в экземпляры класса case.(Я помещаю сообщения на другой стороне.)

Этот код:


    import ss.implicits._
    import org.apache.spark.sql.functions._

    val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
    ss.udf.register("deserialize", (bytes: Array[Byte]) => {
      DefSer.deserialize(bytes).asInstanceOf[TextRecord] }
    )

    val inputStream = ss.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    inputStream.printSchema

    val records = inputStream
        .selectExpr(s"deserialize(value) AS record")

    records.printSchema

    val rec2 = records.as(enc)

    rec2.printSchema

производит такой вывод:



root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- record: struct (nullable = true)
 |    |-- eventTime: timestamp (nullable = true)
 |    |-- lineLength: integer (nullable = false)
 |    |-- windDirection: float (nullable = false)
 |    |-- windSpeed: float (nullable = false)
 |    |-- gustSpeed: float (nullable = false)
 |    |-- waveHeight: float (nullable = false)
 |    |-- dominantWavePeriod: float (nullable = false)
 |    |-- averageWavePeriod: float (nullable = false)
 |    |-- mWaveDirection: float (nullable = false)
 |    |-- seaLevelPressure: float (nullable = false)
 |    |-- airTemp: float (nullable = false)
 |    |-- waterSurfaceTemp: float (nullable = false)
 |    |-- dewPointTemp: float (nullable = false)
 |    |-- visibility: float (nullable = false)
 |    |-- pressureTendency: float (nullable = false)
 |    |-- tide: float (nullable = false)

Когда я добираюсь до раковины



    val debugOut = rec2.writeStream
      .format("console")
      .option("truncate", "false")
      .start()

    debugOut.awaitTermination()

Катализатор жалуется:



Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

Я пробовал несколько вещей, чтобы "подтянуть TextRecord вверх", вызывая rec2.map(r=>r.getAs[TextRecord](0)), explode("record") и т. Д., Но столкнуться сClassCastExceptions.

1 Ответ

1 голос
/ 24 марта 2019

Самый простой способ сделать это - напрямую сопоставить экземпляры inputStream Row с TextRecord, предполагая, что это класс case, с помощью функции map

import ss.implicits._

val inputStream = ss.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

val records = inputStream.map(row => 
  DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)

records будет непосредственно Dataset[TextRecord].

Кроме того, до тех пор, пока вы импортируете импликации SparkSession, вам не нужно предоставлять класс кодировщика для вашего класса дел, Scala сделает это неявно для вас.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...