Можно ли в реальном времени анализировать строку JSON из темы Kafka с помощью Spark Streaming SQL? - PullRequest
0 голосов
/ 19 сентября 2018

У меня есть записная книжка Pyspark, которая подключается к брокеру kafka и создает искровой writeStream, называемый temp.Значения данных в теме Kafka представлены в формате json, но я не уверен, как создать таблицу spark sql, которая может анализировать эти данные в режиме реального времени.Единственный известный мне способ - создать копию таблицы, преобразовать ее в RDD или DF и проанализировать значение в другом RDD и DF.Возможно ли сделать это в режиме реального времени при записи потока?

Код:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","localhost:9092") \
    .option("subscribe","hoteth") \
    .option("startingOffsets", "earliest") \
    .load()

ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()

Вывод:

+----+--------------------+--------------------+
| key|               value|           timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+

1 Ответ

0 голосов
/ 19 сентября 2018

Один из способов решить эту проблему - просто посмотреть сбоку json_tuple, как это делается в Hive HQL.Я все еще ищу решение, которое может анализировать данные непосредственно из потока, чтобы не занимать дополнительное время обработки при помощи запроса.

spark.sql("""
    select value, v1.transaction,ticker,price
    from temp 
    lateral view json_tuple(value,"e","s","p") v1 as transaction, ticker,price
    limit 5
    """).show()
...