Я пытаюсь написать Stream, используя опцию добавления, но получаю ошибку.
Код:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.functions import col, column, count, when
spark = SparkSession\
.builder\
.appName("get_sensor_data")\
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Sensor = lines.select(lines.value.alias('Sensor'),
lines.timestamp)
windowedCounts = Sensor.withWatermark('timestamp', '10 seconds').groupBy(
window(Sensor.timestamp, windowDuration, slideDuration)).\
agg(count(when(col('Sensor')=="LR1 On",True)).alias('LR1'),\
count(when(col('Sensor')=="LR2 On",True)).alias('LR2'),\
count(when(col('Sensor')=="LD On",True)).alias('LD')).\
orderBy('window')
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format("console")\
.start()
Ошибка:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
Причиной использования опции добавления является сохранение в виде файла CSV позже.
Я думаю, что эта проблема вызвана оконной функцией, но я не знаю, как ее решить.