Я получаю ошибку ниже в следующем фрагменте кода -
Исключение в потоке "главная" организация. apache .spark. sql .AnalysisException: Добавить режим вывода не поддерживается, когда есть потоковые агрегаты при потоковой передаче данных DataFrames / DataSets без водяного знака ;;
Ниже приведена моя схема ввода
val schema = new StructType()
.add("product",StringType)
.add("org",StringType)
.add("quantity", IntegerType)
.add("booked_at",TimestampType)
Создание набора исходных потоковых данных
val payload_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as payload")
.select(from_json(col("payload"), schema).as("data"))
.select("data.*")
Создание другой потоковой передачи фрейм данных, в котором выполняется агрегация, а затем объединить его с исходным фреймом данных, чтобы отфильтровать записи
payload_df.createOrReplaceTempView("orders")
val stage_df = spark.sql("select org, product, max(booked_at) as booked_at from orders group by 1,2")
stage_df.createOrReplaceTempView("stage")
val total_qty = spark.sql(
"select o.* from orders o join stage s on o.org = s.org and o.product = s.product and o.booked_at > s.booked_at ")
Наконец, я пытался отобразить результаты на консоли с режимом добавления выходных данных. Я не могу понять, где мне нужно добавить водяной знак или как решить эту проблему. Моя цель - отфильтровать только те события в каждом триггере, которые имеют более высокую временную метку, чем максимальная временная метка, полученная в любом из более ранних триггеров
total_qty
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()