У меня есть записная книжка Pyspark, которая подключается к брокеру kafka и создает искровой writeStream, называемый temp.Значения данных в теме Kafka представлены в формате json, но я не уверен, как создать таблицу spark sql, которая может анализировать эти данные в режиме реального времени.Единственный известный мне способ - создать копию таблицы, преобразовать ее в RDD или DF и проанализировать значение в другом RDD и DF.Возможно ли сделать это в режиме реального времени при записи потока?
Код:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","hoteth") \
.option("startingOffsets", "earliest") \
.load()
ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()
Вывод:
+----+--------------------+--------------------+
| key| value| timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+