Я пытаюсь получить сообщения Кафки и обрабатывать их с помощью Spark в автономном режиме. Кафка хранит данные в формате JSON. Я могу получать сообщения Kafka, но не могу проанализировать данные json с помощью определения схемы.
Когда я запускаю команду bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_kafka_topic --from-beginning
для просмотра сообщений kafka в теме kafka, она выдает следующее:
"{\"timestamp\":1553792312117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":21,\"q\":true,\"t\":1553792311686}]}"
"{\"timestamp\":1553792317117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":22,\"q\":true,\"t\":1553792316688}]}"
И я могу успешно получить эти данные с помощью этого блока кода в Spark:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(col("value").cast("string"))
Схема такая:
df.printSchema()
root
|-- value: string (nullable = true)
А затем записывает этот кадр данных в консоль, и он печатает сообщения kafka:
Batch: 9
-------------------------------------------
+--------------------+
| value|
+--------------------+
|"{\"timestamp\":1...|
+--------------------+
Но я хочу проанализировать данные json, чтобы определить схему и блок кода, который я пытался сделать:
schema = StructType([ StructField("timestamp", LongType(), False), StructField("values", ArrayType( StructType([ StructField("id", StringType(), True), StructField("v", IntegerType(), False), StructField("q", BooleanType(), False), StructField("t", LongType(), False) ]), True ), True) ])
parsed = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("opc"))
И схема parsed
кадра данных:
parsed.printSchema()
root
|-- opc: struct (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- values: struct (nullable = true)
| | |-- id: string (nullable = true)
| | |-- v: integer (nullable = true)
| | |-- q: boolean (nullable = true)
| | |-- t: string (nullable = true)
Эти кодовые блоки работают без ошибок. Но когда я хочу записать parsed
dataframe на консоль:
query = parsed\
.writeStream\
.format("console")\
.start()
query.awaitTermination()
это пишет null
как это в консоли:
+----+
| opc|
+----+
|null|
+----+
Итак, похоже, что есть проблема с анализом данных json, но я не могу понять это.
Можете ли вы сказать мне, что не так?