Мой вопрос касается Spark Streaming и столбца, который является JSON и столбца func_params. но, как вы видите ниже в определении схемы, я получаю его в виде строки. Дело в том, что сам JSON динамичен. Мне нужно проверить, есть ли определенные значения в JSON и принимать решения на их основе. но я не могу понять, как преобразовать столбец в фактический JSON. Я посмотрел, но у всех есть схема, и в этом случае я не знаю схему заранее.
#Define schema of json coming from kafka
schema = StructType().add("id", "string") \
.add("function_params", "string") \
.add("srv_source", "string") \
.add("user_id", "string") \
.add("entrytimestamp", "string")
#load data into spark-structured streaming
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "dic_rest") \
.load() \
.selectExpr("CAST(value AS STRING) as message") \
.select(from_json(col("message").cast("string"), schema).alias("json")) \
.select("json.*")
#need to code here the column to json and then work with it such
#as if the json has a key I create an entry for another kafka topic.
Любая помощь будет принята с благодарностью. Я чувствую, что это легко, и я просто делаю это тяжелее, чем на самом деле.