Искра возникла проблема с окном потоковой передачи - PullRequest
0 голосов
/ 13 мая 2019

У меня проблема с обновлением окна в Spark Structed Streaming.Я хочу сгруппировать данные, которые я получаю непрерывно из источника Кафки, в скользящем окне и посчитать количество данных.Проблема заключается в том, что writestream передает поток данных окна каждый раз, когда поступают данные, и обновляет счетчик текущего окна.

Я использую следующий код для создания окна:

#Define schema of the topic to be consumed 
jsonSchema = StructType([ StructField("State", StringType(), True) \
                        , StructField("Value", StringType(), True) \
                        , StructField("SourceTimestamp", StringType(), True) \
                        , StructField("Tag", StringType(), True)
                        ])


spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .config("spark.default.parallelism", "100") \
    .getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "10.129.140.23:9092") \
  .option("subscribe", "SIMULATOR.SUPERMAN.TOTO") \
  .load() \
  .select(from_json(col("value").cast("string"), jsonSchema).alias("data")) \
  .select("data.*")

df = df.withColumn("time", current_timestamp())

Window = df \
    .withColumn("window",window("time","4 seconds","1 seconds")).groupBy("window").count() \
    .withColumn("time", current_timestamp())

#Write back to kafka

query = Window.select(to_json(struct("count","window","time")).alias("value")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "10.129.140.23:9092") \
    .outputMode("update") \
    .option("topic", "structed") \
    .option("checkpointLocation", "/home/superman/notebook/checkpoint") \
    .start()

Окна не сортируются и обновляются каждый раз при изменении количества.Как мы можем ждать конца окна и транслировать окончательный счет один раз.Вместо этого выведите:

{"count":21,"window":{"start":"2019-05-13T09:39:14.000Z","end":"2019-05-13T09:39:18.000Z"},"time":"2019-05-13T09:39:15.026Z"}
{"count":47,"window":{"start":"2019-05-13T09:39:12.000Z","end":"2019-05-13T09:39:16.000Z"},"time":"2019-05-13T09:39:15.026Z"}
{"count":21,"window":{"start":"2019-05-13T09:39:13.000Z","end":"2019-05-13T09:39:17.000Z"},"time":"2019-05-13T09:39:15.026Z"}
{"count":21,"window":{"start":"2019-05-13T09:39:15.000Z","end":"2019-05-13T09:39:19.000Z"},"time":"2019-05-13T09:39:15.026Z"}
{"count":21,"window":{"start":"2019-05-13T09:39:16.000Z","end":"2019-05-13T09:39:20.000Z"},"time":"2019-05-13T09:39:17.460Z"}
{"count":42,"window":{"start":"2019-05-13T09:39:14.000Z","end":"2019-05-13T09:39:18.000Z"},"time":"2019-05-13T09:39:17.460Z"}
{"count":42,"window":{"start":"2019-05-13T09:39:15.000Z","end":"2019-05-13T09:39:19.000Z"},"time":"2019-05-13T09:39:17.460Z"}
{"count":21,"window":{"start":"2019-05-13T09:39:17.000Z","end":"2019-05-13T09:39:21.000Z"},"time":"2019-05-13T09:39:17.460Z"}
{"count":40,"window":{"start":"2019-05-13T09:39:16.000Z","end":"2019-05-13T09:39:20.000Z"},"time":"2019-05-13T09:39:19.818Z"}
{"count":19,"window":{"start":"2019-05-13T09:39:19.000Z","end":"2019-05-13T09:39:23.000Z"},"time":"2019-05-13T09:39:19.818Z"}
{"count":19,"window":{"start":"2019-05-13T09:39:18.000Z","end":"2019-05-13T09:39:22.000Z"},"time":"2019-05-13T09:39:19.818Z"}
{"count":40,"window":{"start":"2019-05-13T09:39:17.000Z","end":"2019-05-13T09:39:21.000Z"},"time":"2019-05-13T09:39:19.818Z"}
{"count":37,"window":{"start":"2019-05-13T09:39:19.000Z","end":"2019-05-13T09:39:23.000Z"},"time":"2019-05-13T09:39:21.939Z"}
{"count":18,"window":{"start":"2019-05-13T09:39:21.000Z","end":"2019-05-13T09:39:25.000Z"},"time":"2019-05-13T09:39:21.939Z"}

Мне бы хотелось:

{"count":47,"window":{"start":"2019-05-13T09:39:12.000Z","end":"2019-05-13T09:39:16.000Z"},"time":"2019-05-13T09:39:15.026Z"}
{"count":21,"window":{"start":"2019-05-13T09:39:13.000Z","end":"2019-05-13T09:39:17.000Z"},"time":"2019-05-13T09:39:15.026Z"}
{"count":42,"window":{"start":"2019-05-13T09:39:14.000Z","end":"2019-05-13T09:39:18.000Z"},"time":"2019-05-13T09:39:17.460Z"}
{"count":42,"window":{"start":"2019-05-13T09:39:15.000Z","end":"2019-05-13T09:39:19.000Z"},"time":"2019-05-13T09:39:17.460Z"}
{"count":40,"window":{"start":"2019-05-13T09:39:16.000Z","end":"2019-05-13T09:39:20.000Z"},"time":"2019-05-13T09:39:19.818Z"}
{"count":40,"window":{"start":"2019-05-13T09:39:17.000Z","end":"2019-05-13T09:39:21.000Z"},"time":"2019-05-13T09:39:19.818Z"}

Ожидаемый выходной сигнал ожидания закрытия окна на основе сравнения между отметкой времени окончания и текущим временем.

...