У меня есть поток Kafka, через который я получаю журналы устройств IoT на основе JSON. Я использую pyspark для обработки потока, чтобы проанализировать и создать преобразованный вывод.
Мое устройство json выглядит например:
{"messageid":"1209a714-811d-4ad6-82b7-5797511d159f",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}
{"messageid":"4d119126-2d12-412c-99c2-c159381bee5c",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}
Я пытаюсь преобразовать журналы таким образом, чтобы он возвращал мне уникальный счетчик каждого устройства на основе метки времени и идентификатора датчика. Результат JSON будет выглядеть так:
{
"sensor_id":"CAM_009",
"timestamp":"2020-01-20 19:04:32 +0530",
"location":"General Assembly Area",
count:2
}
Полный код, который я пытаюсь - pyspark-kafka.py
spark = SparkSession.builder.appName('analytics').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
brokers='kafka-mybroker-url-host:9092'
readTopic = 'DetectionEntry'
outTopic = 'DetectionResults'
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",brokers).option("subscribe",readTopic).load()
transaction_detail_df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
alert_schema = StructType() \
.add("message_id", StringType()) \
.add("mdsversion", StringType()) \
.add("timestamp", StringType()) \
.add("sensor_id", StringType()) \
.add("location", StringType()) \
.add("detection_class", StringType()) \
transaction_detail_df2 = transaction_detail_df1\
.select(from_json(col("value"), alert_schema).alias("alerts"))
transaction_detail_df3 = transaction_detail_df2.select("alerts.*")
transaction_detail_df3 = transaction_detail_df3.withColumn("timestamp",to_timestamp(col("timestamp"),"YYYY-MM-DD HH:mm:ss SSSS")).withWatermark("timestamp", "500 milliseconds")
tempView = transaction_detail_df3.createOrReplaceTempView("alertsview")
results = spark.sql("select sensor_id, timestamp, location, count(*) as count from alertsview group by sensor_id, timestamp, location")
results.printSchema()
results_kakfa_output = results
results_kakfa_output.writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='3 seconds') \
.start().awaitTermination()
Когда я запускаю этот код, я получаю следующий вывод. Общая цель состоит в том, чтобы обрабатывать все журналы устройства с интервалом в 3 секунды и находить уникальные значения для каждой записи метки времени для устройства в течение интервала времени. Я пробовал запрос SQL в базе данных MySQL с той же схемой, и он работает нормально. Тем не менее, я не получаю результатов здесь в выводе для дальнейшей обработки. Я не могу понять, что мне здесь не хватает.