Здесь я отправляю данные json в kafka из темы «test», передаю схему json, выполняю некоторые преобразования и печатаю их на консоли.Вот код: -
val kafkadata = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("zookeeper.connect", "localhost:2181")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
val schema1 = new StructType()
.add("id_sales_order", StringType)
.add("item_collection",
MapType(
StringType,
new StructType()
.add("id", LongType)
.add("ip", StringType)
.add("description", StringType)
.add("temp", LongType)
.add("c02_level", LongType)
.add("geo",
new StructType()
.add("lat", DoubleType)
.add("long", DoubleType)
)
)
)
val df = kafkadata.selectExpr("cast (value as string) as
json")
.select(from_json($"json",
schema=schema1).as("data"))
.select($"data.id_sales_order",explode($"data.item_collection"))
val query = df.writeStream
.outputMode("append")
.queryName("table")
.format("console")
.start()
query.awaitTermination()
spark.stop()
Я отправляю данные в kafka двумя способами: -
1) Однострочная json: -
{"id_sales_order": "2", "item_collection": {"2": {"id": 10,"ip": "68.28.91.22","description": "Sensor attached to the container ceilings","temp":35,"c02_level": 1475,"geo": { "lat":38.00, "long":97.00}}}}
It is giving me output
+--------------+---+--------------------+
|id_sales_order|key| value|
+--------------+---+--------------------+
| 2| 2|[10,68.28.91.22,S...|
+--------------+---+--------------------+
2)Многострочный json: -
{
"id_sales_order": "2",
"item_collection": {
"2": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":35,
"c02_level": 1475,
"geo":
{ "lat":38.00, "long":97.00}
}
}
}
It is not giving me any output.
+--------------+---+-----+
|id_sales_order|key|value|
+--------------+---+-----+
+--------------+---+-----+
Json, исходящий из источника, похож на 2-й.
Как вы справляетесь с json при чтении потоковых данных из kafka?Я думаю, что проблема может быть в том, что функция from_json не понимает многострочный json.