Схема DataFrame
, созданная из потокового источника:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Таким образом, ключ и значение фактически находятся в Array[Byte]
.Вам придется выполнить десериализацию в операциях с Dataframe.
Например, у меня есть это для десериализации Kafka:
import sparkSession.implicits._
sparkSession.readStream
.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrapServers
)
.load()
.selectExpr("key", "value") // Selecting only key & value
.as[(Array[Byte], Array[Byte])]
.flatMap {
case (key, value) =>
for {
deserializedKey <- Try {
keyDeserializer.deserialize(topic, key)
}.toOption
deserializedValue <- Try {
valueDeserializer.deserialize(topic, value)
}.toOption
} yield (deserializedKey, deserializedValue)
}
Вам нужно изменить это, чтобы десериализовать ваши записи protobuf. * 1011 *