Чтение вложенного Json в Spark-структурированном потоке - PullRequest
0 голосов
/ 11 июня 2019

Я пытаюсь читать данные из Кафки, используя структурированную потоковую передачу.Данные, полученные от Кафки, представлены в формате json.Я использую образец json для создания схемы, а затем в коде я использую функцию from_json для преобразования json в информационный фрейм для дальнейшей обработки.Проблема, с которой я сталкиваюсь, связана с вложенной схемой и несколькими значениями.Пример схемы определяет тег (скажем, a) как структуру.Данные json, считанные с kafka, могут иметь одно или несколько значений для одного и того же тега (в двух разных значениях).

val df0= spark.read.format("json").load("contactSchema0.json")
val schema0 = df0.schema
val df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "my_first_topic").load()
val df2 = df1.selectExpr("CAST(value as STRING)").toDF()
val df3 = df2.select(from_json($"value",schema0).alias("value")) 

contactSchema0.json имеет следующий пример тега:

"contactList": {
        "contact": [{
          "id": 1001
},
{
 "id": 1002
}]
}

Таким образом, контакт выводится как структура.Но данные JSON, считанные из Kafka, также могут иметь следующие данные:

"contactList": {
                "contact": {
                  "id": 1001
        }
    }

Так что, если я определю схему как структуру, spark.json не сможет вывести отдельные значения, а в случае, если я определю схемутак как строка spark.json не может вывести несколько значений.

1 Ответ

1 голос
/ 11 июня 2019

Не удается найти такую ​​функцию в Параметры JSON Spark , но у Джексона DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, как описано в в этом ответе .

Так что мы можем обойтись чем-то вроде этого

case class MyModel(contactList: ContactList)
case class ContactList(contact: Array[Contact])
case class Contact(id: Int)

val txt =
  """|{"contactList": {"contact": [{"id": 1001}]}}
     |{"contactList": {"contact": {"id": 1002}}}"""
    .stripMargin.lines.toSeq.toDS()

txt
  .mapPartitions[MyModel] { it: Iterator[String] =>
    val reader = new ObjectMapper()
      .registerModule(DefaultScalaModule)
      .enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
      .readerFor(classOf[MyModel])
    it.map(reader.readValue[MyModel])
  }
  .show()

Выход:

+-----------+
|contactList|
+-----------+
| [[[1001]]]|
| [[[1002]]]|
+-----------+

Обратите внимание, что для получения Dataset в вашем коде вы можете использовать

val df2 = df1.selectExpr("CAST(value as STRING)").as[String]

вместо этого, а затем позвоните mapPartitions для df2, как и раньше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...