Spark 2.4 Потоковая передача данных динамического JSON в столбце - PullRequest
0 голосов
/ 04 ноября 2019

Мой вопрос касается Spark Streaming и столбца, который является JSON и столбца func_params. но, как вы видите ниже в определении схемы, я получаю его в виде строки. Дело в том, что сам JSON динамичен. Мне нужно проверить, есть ли определенные значения в JSON и принимать решения на их основе. но я не могу понять, как преобразовать столбец в фактический JSON. Я посмотрел, но у всех есть схема, и в этом случае я не знаю схему заранее.

#Define schema of json coming from kafka
schema = StructType().add("id", "string") \
                 .add("function_params", "string") \
                 .add("srv_source", "string") \
                 .add("user_id", "string") \
                 .add("entrytimestamp", "string")

#load data into spark-structured streaming
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "dic_rest") \
  .load() \
  .selectExpr("CAST(value AS STRING) as message") \
  .select(from_json(col("message").cast("string"), schema).alias("json")) \
  .select("json.*")

#need to code here the column to json and then work with it such 
#as if the json has a key I create an entry for another kafka topic.

Любая помощь будет принята с благодарностью. Я чувствую, что это легко, и я просто делаю это тяжелее, чем на самом деле.

...