Поток Python JSON данных в PySpark Dataframe с заголовком столбца - PullRequest
1 голос
/ 19 июня 2020

Я использую структурированный поток из Kafka Source в PySpark Dataframe. Типы данных, которые предоставляет Kafka, - это JSON с такой структурой:

{
"id":XXX,
"user_id":1,
"status":"PENDING",
...,
}

, которые я хочу передать в потоковом режиме, с выводом, что ключ JSON является заголовком таблицы:

--------------------
id |user_id|status |
--------------------
XXX|1      |PENDING|

Я пытаюсь использовать этот код:

schema = = StructType() \
    .add("id", IntegerType()) \
    .add("user_id", IntegerType()) \
    .add("status", StringType())

src = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers",KAFKA_SERVER)\
    .option("subscribe",KAFKA_TOPIC)\
    .option("startingOffsets","earliest")\
    .load()
    FDR = src.select(from_json(col("value").cast("string"),schema).alias("parsed"))
    query = FDR.writeStream\
        .format("console")\
        .option("truncate", False)\
        .start()

Но ничего не выходит, и он останавливается, ничего не показывая. Любая помощь приветствуется.

1 Ответ

0 голосов
/ 24 июня 2020

Итак, после нескольких исследований я ошибся в выборе запроса. Должно быть так:

.select(
       from_json(col("value").cast("string"),<your_schema>).alias("<your_alias>")
    )
    FDR.select("<your_alias>.*")
...