Spark SQL потоковая передача с Kafka на данных Json: функция from_json не в состоянии проанализировать многострочный JSON из темы Kafka - PullRequest
0 голосов
/ 22 января 2019

Здесь я отправляю данные json в kafka из темы «test», передаю схему json, выполняю некоторые преобразования и печатаю их на консоли.Вот код: -

val kafkadata = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("zookeeper.connect", "localhost:2181")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("max.poll.records", 10)
    .option("failOnDataLoss", false)
    .load()



val schema1 = new StructType()
    .add("id_sales_order", StringType)                               
    .add("item_collection",                                         
    MapType(                                             
      StringType,
      new StructType()
        .add("id", LongType)
        .add("ip", StringType)
        .add("description", StringType)
        .add("temp", LongType)
        .add("c02_level", LongType)
        .add("geo",
          new StructType()
            .add("lat", DoubleType)
            .add("long", DoubleType)
        )
    )
  )



val df = kafkadata.selectExpr("cast (value as string) as 
           json")
           .select(from_json($"json",
schema=schema1).as("data"))
.select($"data.id_sales_order",explode($"data.item_collection"))




 val query = df.writeStream
    .outputMode("append")
    .queryName("table")
    .format("console")
    .start()
  query.awaitTermination()
  spark.stop()

Я отправляю данные в kafka двумя способами: -

1) Однострочная json: -

 {"id_sales_order": "2", "item_collection": {"2": {"id": 10,"ip": "68.28.91.22","description": "Sensor attached to the container ceilings","temp":35,"c02_level": 1475,"geo": { "lat":38.00, "long":97.00}}}}

It is giving me output
+--------------+---+--------------------+
|id_sales_order|key|               value|
+--------------+---+--------------------+
|             2|  2|[10,68.28.91.22,S...|
+--------------+---+--------------------+

2)Многострочный json: -

{
  "id_sales_order": "2",
  "item_collection": {
    "2": {
      "id": 10,
      "ip": "68.28.91.22",
      "description": "Sensor attached to the container ceilings",
      "temp":35,
      "c02_level": 1475,
      "geo":
        { "lat":38.00, "long":97.00}
    }
}
}

It is not giving me any output.
+--------------+---+-----+
|id_sales_order|key|value|
+--------------+---+-----+
+--------------+---+-----+

Json, исходящий из источника, похож на 2-й.

Как вы справляетесь с json при чтении потоковых данных из kafka?Я думаю, что проблема может быть в том, что функция from_json не понимает многострочный json.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...