Я использую структурированный поток из Kafka Source в PySpark Dataframe. Типы данных, которые предоставляет Kafka, - это JSON с такой структурой:
{
"id":XXX,
"user_id":1,
"status":"PENDING",
...,
}
, которые я хочу передать в потоковом режиме, с выводом, что ключ JSON является заголовком таблицы:
--------------------
id |user_id|status |
--------------------
XXX|1 |PENDING|
Я пытаюсь использовать этот код:
schema = = StructType() \
.add("id", IntegerType()) \
.add("user_id", IntegerType()) \
.add("status", StringType())
src = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers",KAFKA_SERVER)\
.option("subscribe",KAFKA_TOPIC)\
.option("startingOffsets","earliest")\
.load()
FDR = src.select(from_json(col("value").cast("string"),schema).alias("parsed"))
query = FDR.writeStream\
.format("console")\
.option("truncate", False)\
.start()
Но ничего не выходит, и он останавливается, ничего не показывая. Любая помощь приветствуется.