Я пытаюсь распаковать значения сообщения Kafka в экземпляры класса case.(Я помещаю сообщения на другой стороне.)
Этот код:
import ss.implicits._
import org.apache.spark.sql.functions._
val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) => {
DefSer.deserialize(bytes).asInstanceOf[TextRecord] }
)
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
inputStream.printSchema
val records = inputStream
.selectExpr(s"deserialize(value) AS record")
records.printSchema
val rec2 = records.as(enc)
rec2.printSchema
производит такой вывод:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)
Когда я добираюсь до раковины
val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()
debugOut.awaitTermination()
Катализатор жалуется:
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
Я пробовал несколько вещей, чтобы "подтянуть TextRecord вверх", вызывая rec2.map(r=>r.getAs[TextRecord](0))
, explode("record")
и т. Д., Но столкнуться сClassCastExceptions
.