Как проверить, больше или меньше n последовательных событий из потока kafka, чем пороговое ограничение - PullRequest
1 голос
/ 18 апреля 2019

я новичок в писпарк. Я написал программу pyspark для чтения потока kafka с помощью оконной операции. Я публикую нижеприведенное сообщение для kafka каждую секунду с различными источниками и температурами вместе с отметкой времени.

{"temperature":34,"time":"2019-04-17 12:53:02","source":"1010101"}
{"temperature":29,"time":"2019-04-17 12:53:03","source":"1010101"}
{"temperature":28,"time":"2019-04-17 12:53:04","source":"1010101"}
{"temperature":34,"time":"2019-04-17 12:53:05","source":"1010101"}
{"temperature":45,"time":"2019-04-17 12:53:06","source":"1010101"}
{"temperature":34,"time":"2019-04-17 12:53:07","source":"1010102"}
{"temperature":29,"time":"2019-04-17 12:53:08","source":"1010102"}
{"temperature":28,"time":"2019-04-17 12:53:09","source":"1010102"}
{"temperature":34,"time":"2019-04-17 12:53:10","source":"1010102"}
{"temperature":45,"time":"2019-04-17 12:53:11","source":"1010102"}

Как проверить, превышает ли n последовательных записей температуры для источника пороговое значение (<30 и> 40), а затем опубликовать предупреждения для Kafka. Также, пожалуйста, дайте мне знать, если приведенная ниже программа эффективна для чтения потока kafka или требует каких-либо изменений?

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import avg, window, from_json, from_unixtime, unix_timestamp
import uuid

schema = StructType([
    StructField("source", StringType(), True),
    StructField("temperature", FloatType(), True),
    StructField("time", StringType(), True)
])

spark = SparkSession \
    .builder.master("local[8]") \
    .appName("test-app") \
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 5)

df1 = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

df2 = df1.select(from_json("value", schema).alias(
    "sensors")).select("sensors.*")

df3 = df2.select(df2.source, df2.temperature, from_unixtime(
    unix_timestamp(df2.time, 'yyyy-MM-dd HH:mm:ss')).alias('time'))
df4 = df3.groupBy(window(df3.time, "2 minutes", "1 minutes"),
                  df3.source).agg(avg("temperature"))

query1 = df4.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "/tmp/temporary-" + str(uuid.uuid4())) \
    .start()

query1.awaitTermination()
...