Продолжение до этого вопроса
У меня потоковая передача данных в формате json, как показано ниже
| A | B |
|-------|------------------------------------------|
| ABC | [{C:1, D:1}, {C:2, D:4}] |
| XYZ | [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |
Мне нужно преобразовать его в формат ниже
| A | C | D |
|-------|-----|------|
| ABC | 1 | 1 |
| ABC | 2 | 4 |
| XYZ | 3 | 6 |
| XYZ | 9 | 11 |
| XYZ | 5 | 12 |
Для достижения этой цели были выполнены преобразования, предложенные в предыдущем вопросе.
val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")
val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum")
val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")
val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))
val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C"))
val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")
Теперь я пытаюсь сохранить результат в файл CSV в HDFS
df6.withWatermark("event_time", "0 seconds")
.writeStream
.trigger(Trigger.ProcessingTime("0 seconds"))
.queryName("query_db")
.format("parquet")
.option("checkpointLocation", "/path/to/checkpoint")
.option("path", "/path/to/output")
// .outputMode("complete")
.start()
Теперь я получаю ошибку ниже.
Исключение в потоке "main" org.apache.spark.sql.AnalysisException: добавление режима вывода не поддерживается при потоковых агрегатах потоковых DataFrames / DataSets без водяного знака ;;
EventTimeWatermark event_time # 223: отметка времени, интервал
Я сомневаюсь, что я не выполняю агрегирование, которое потребовало бы от него сохранения агрегированного значения сверх времени обработки для этой строки. Почему я получаю эту ошибку? Могу ли я оставить водяные знаки на 0 секунд?
Любая помощь по этому вопросу будет высоко оценена.