Мой проект, напишите json в Kafka Topi c и прочитайте json из kafka topi c наконец потопите CSV. Все хорошо. Но какой-то ключ вложенный json. Как я могу проанализировать список в json?
Пример Json:
{"a": "test", "b": "1234", "c": "temp", "d": [{"test1": "car", "test2": 345}, {"test3": "animal", "test4": 1}], "e": 50000}
Вы можете увидеть мой код ниже.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func
spark = SparkSession.builder\
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \
.appName('kafka_stream_test')\
.getOrCreate()
ordersSchema = StructType() \
.add("a", StringType()) \
.add("b", StringType()) \
.add("c", StringType()) \
.add("d", StringType())\
.add("e", StringType())
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()\
df_query = df \
.selectExpr("cast(value as string)") \
.select(func.from_json(func.col("value").cast("string"),ordersSchema).alias("parsed"))\
.select("parsed.a","parsed.b","parsed.c","parsed.d","parsed.e","parsed.f")\
df_s = df_query \
.writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime = "1 seconds")\
.start()
aa = df_query \
.writeStream \
.format("csv")\
.trigger(processingTime = "5 seconds")\
.option("path", "/var/kafka_stream_test_out/")\
.option("checkpointLocation", "/var/kafka_stream_test_out/chk") \
.start()
df.printSchema()
df_s.awaitTermination()
aa.awaitTermination()
Спасибо!