Как использовать Scala Case Class для отображения источника Кафки в Spark Структурированный Поток - PullRequest
0 голосов
/ 02 июля 2018

Я пытаюсь использовать структурированную потоковую передачу в искре, так как она хорошо подходит для моего варианта использования. Однако, я не могу найти способ отобразить входящие данные из Kafka в класс case. Это то, как далеко я мог пойти на основании официальной документации.

import sparkSession.sqlContext.implicits._                          
val kafkaDF:DataFrame = sparkSession
                                          .readStream
                                          .format("kafka")
                                          .option("kafka.bootstrap.servers", bootstrapServers_CML)
                                          .option("subscribe", topics_ME)
                                          .option("startingOffsets", "latest")
                                          .load()
                                          .selectExpr("cast (value as string) as json") //Kakfa sends data in a specific schema (key, value, topic, offset, timestamp etc)    

val schema_ME = StructType(Seq(
    StructField("Parm1", StringType, true),
    StructField("Parm2", StringType, true),
    StructField("Parm3", TimestampType, true)))  

val mobEventDF:DataFrame = kafkaDF
                         .select(from_json($"json", schema_ME).as("mobEvent")) //Using a StructType to convert to application specific schema. Cant seem to use a case class for schema directly yet. Perhaps with later API??
                         .na.drop()

mobEventDF имеет такую ​​схему, как эта

root
 |-- appEvent: struct (nullable = true)
 |    |-- Parm1: string (nullable = true)
 |    |-- Parm2: string (nullable = true)
 |    |-- Parm3: string (nullable = true)

Есть ли лучший способ сделать это? Как я могу сопоставить это с классом Scala Case, как показано ниже?

case class ME(name: String, 
                 factory: String,
                 delay: Timestamp)

1 Ответ

0 голосов
/ 02 июля 2018

Выберите и переименуйте все поля, а затем вызовите as метод

kafkaDF.select($"mobEvent.*").toDF("name", "factory", "delay").as[ME]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...