Как проанализировать строковый столбец json в DataStreamReader pyspark и создать фрейм данных - PullRequest
0 голосов
/ 15 февраля 2019

Я читаю сообщения из темы кафки

messageDFRaw = spark.readStream\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("subscribe", "test-message")\
                    .load()

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")

Когда я печатаю фрейм данных из вышеприведенного запроса, я получаю следующий вывод на консоль.

|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|

Как я могу создатьфрейм данных из DataStreamReader, такой, что у меня есть фрейм данных со столбцами |key|channel| username| message|

. Я попытался следовать принятому ответу в Как читать записи в формате JSON из Kafka с использованием структурированной потоковой передачи?

struct = StructType([
    StructField("channel", StringType()),
    StructField("username", StringType()),
    StructField("message", StringType()),
])

messageDFRaw.select(from_json("CAST(value AS STRING)", struct))

но я получаю Expected type 'StructField', got 'StructType' instead в from_json()

1 Ответ

0 голосов
/ 17 февраля 2019

Я проигнорировал предупреждение Expected type 'StructField', got 'StructType' instead в from_json().

Однако мне сначала пришлось привести значение из сообщения kafka, а затем проанализировать схему json.

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))

messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...