Spark SQL Преобразование не возвращает данных (структурированный поток) - PullRequest
0 голосов
/ 20 января 2020

У меня есть поток Kafka, через который я получаю журналы устройств IoT на основе JSON. Я использую pyspark для обработки потока, чтобы проанализировать и создать преобразованный вывод.

Мое устройство json выглядит например:

{"messageid":"1209a714-811d-4ad6-82b7-5797511d159f",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}

{"messageid":"4d119126-2d12-412c-99c2-c159381bee5c",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}

Я пытаюсь преобразовать журналы таким образом, чтобы он возвращал мне уникальный счетчик каждого устройства на основе метки времени и идентификатора датчика. Результат JSON будет выглядеть так:

{
"sensor_id":"CAM_009",
"timestamp":"2020-01-20 19:04:32 +0530",
"location":"General Assembly Area",
count:2
}

Полный код, который я пытаюсь - pyspark-kafka.py

spark = SparkSession.builder.appName('analytics').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
brokers='kafka-mybroker-url-host:9092'
readTopic = 'DetectionEntry'
outTopic = 'DetectionResults'

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",brokers).option("subscribe",readTopic).load()

transaction_detail_df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
alert_schema = StructType() \
    .add("message_id", StringType()) \
    .add("mdsversion", StringType()) \
    .add("timestamp", StringType()) \
    .add("sensor_id", StringType()) \
    .add("location", StringType()) \
    .add("detection_class", StringType()) \

transaction_detail_df2 = transaction_detail_df1\
        .select(from_json(col("value"), alert_schema).alias("alerts"))

transaction_detail_df3 = transaction_detail_df2.select("alerts.*")
transaction_detail_df3 = transaction_detail_df3.withColumn("timestamp",to_timestamp(col("timestamp"),"YYYY-MM-DD HH:mm:ss SSSS")).withWatermark("timestamp", "500 milliseconds")

tempView = transaction_detail_df3.createOrReplaceTempView("alertsview")
results = spark.sql("select sensor_id, timestamp, location, count(*) as count from alertsview group by sensor_id, timestamp, location")
results.printSchema()

results_kakfa_output = results
results_kakfa_output.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime='3 seconds') \
    .start().awaitTermination()

Когда я запускаю этот код, я получаю следующий вывод. Общая цель состоит в том, чтобы обрабатывать все журналы устройства с интервалом в 3 секунды и находить уникальные значения для каждой записи метки времени для устройства в течение интервала времени. Я пробовал запрос SQL в базе данных MySQL с той же схемой, и он работает нормально. Тем не менее, я не получаю результатов здесь в выводе для дальнейшей обработки. Я не могу понять, что мне здесь не хватает.

enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...