Ошибка водяного знака структурированной потоковой передачи Spark - PullRequest
0 голосов
/ 02 ноября 2018

Продолжение до этого вопроса

У меня потоковая передача данных в формате json, как показано ниже

|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

Мне нужно преобразовать его в формат ниже

|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

Для достижения этой цели были выполнены преобразования, предложенные в предыдущем вопросе.

val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

Теперь я пытаюсь сохранить результат в файл CSV в HDFS

df6.withWatermark("event_time", "0 seconds")
  .writeStream
  .trigger(Trigger.ProcessingTime("0 seconds"))
  .queryName("query_db")
  .format("parquet")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("path", "/path/to/output")
  //      .outputMode("complete")
  .start()

Теперь я получаю ошибку ниже.

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: добавление режима вывода не поддерживается при потоковых агрегатах потоковых DataFrames / DataSets без водяного знака ;; EventTimeWatermark event_time # 223: отметка времени, интервал

Я сомневаюсь, что я не выполняю агрегирование, которое потребовало бы от него сохранения агрегированного значения сверх времени обработки для этой строки. Почему я получаю эту ошибку? Могу ли я оставить водяные знаки на 0 секунд?

Любая помощь по этому вопросу будет высоко оценена.

1 Ответ

0 голосов
/ 02 ноября 2018

Насколько я понимаю, использование водяных знаков требуется только тогда, когда вы выполняете оконную операцию во время события. Spark использовал водяные знаки для обработки поздних данных, и для той же цели Spark необходимо сохранить более старую агрегацию.

Следующая ссылка очень хорошо объясняет это на примере: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

Я не вижу никаких оконных операций в вашем преобразовании, и если это так, то я думаю, что вы можете попробовать выполнить потоковый запрос без водяных знаков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...