Я читаю сообщения из темы кафки
messageDFRaw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "test-message")\
.load()
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")
Когда я печатаю фрейм данных из вышеприведенного запроса, я получаю следующий вывод на консоль.
|key|dict|
|#badbunny |{"channel": "#badbunny", "username": "mgat22", "message": "cool"}|
Как я могу создатьфрейм данных из DataStreamReader, такой, что у меня есть фрейм данных со столбцами |key|channel| username| message|
. Я попытался следовать принятому ответу в Как читать записи в формате JSON из Kafka с использованием структурированной потоковой передачи?
struct = StructType([
StructField("channel", StringType()),
StructField("username", StringType()),
StructField("message", StringType()),
])
messageDFRaw.select(from_json("CAST(value AS STRING)", struct))
но я получаю Expected type 'StructField', got 'StructType' instead
в from_json()