streamWrite с опцией добавления и оконной функцией - PullRequest
0 голосов
/ 26 марта 2019

Я пытаюсь написать Stream, используя опцию добавления, но получаю ошибку.

Код:

from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.functions import col, column, count, when

spark = SparkSession\
        .builder\
        .appName("get_sensor_data")\
        .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Sensor = lines.select(lines.value.alias('Sensor'),
        lines.timestamp)

windowedCounts = Sensor.withWatermark('timestamp', '10 seconds').groupBy(
        window(Sensor.timestamp, windowDuration, slideDuration)).\
        agg(count(when(col('Sensor')=="LR1 On",True)).alias('LR1'),\
        count(when(col('Sensor')=="LR2 On",True)).alias('LR2'),\
        count(when(col('Sensor')=="LD On",True)).alias('LD')).\
        orderBy('window')

query = windowedCounts\
        .writeStream\
        .outputMode('append')\
        .format("console")\
        .start()

Ошибка:

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

Причиной использования опции добавления является сохранение в виде файла CSV позже. Я думаю, что эта проблема вызвана оконной функцией, но я не знаю, как ее решить.

...