Я пытаюсь использовать структурированную потоковую передачу в искре, так как она хорошо подходит для моего варианта использования. Однако, я не могу найти способ отобразить входящие данные из Kafka в класс case. Это то, как далеко я мог пойти на основании официальной документации.
import sparkSession.sqlContext.implicits._
val kafkaDF:DataFrame = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers_CML)
.option("subscribe", topics_ME)
.option("startingOffsets", "latest")
.load()
.selectExpr("cast (value as string) as json") //Kakfa sends data in a specific schema (key, value, topic, offset, timestamp etc)
val schema_ME = StructType(Seq(
StructField("Parm1", StringType, true),
StructField("Parm2", StringType, true),
StructField("Parm3", TimestampType, true)))
val mobEventDF:DataFrame = kafkaDF
.select(from_json($"json", schema_ME).as("mobEvent")) //Using a StructType to convert to application specific schema. Cant seem to use a case class for schema directly yet. Perhaps with later API??
.na.drop()
mobEventDF имеет такую схему, как эта
root
|-- appEvent: struct (nullable = true)
| |-- Parm1: string (nullable = true)
| |-- Parm2: string (nullable = true)
| |-- Parm3: string (nullable = true)
Есть ли лучший способ сделать это? Как я могу сопоставить это с классом Scala Case, как показано ниже?
case class ME(name: String,
factory: String,
delay: Timestamp)