Как истечь состояние dropDuplicates в структурированной потоковой передаче с помощью оконной функции в Java, чтобы избежать OOM? - PullRequest
0 голосов
/ 17 января 2019

Я использую метод dropDuplicates () в Spark Structured Streaming 2.2.1 и мне нужно рабочее решение для использования withWatermark () для уменьшения состояния.

Моя первая попытка добиться этого была неверной, поскольку я не использовал столбец отметки времени, определенный в withWatermark (), для удаления дубликатов, потому что на самом деле я хочу удалить дубликаты, основываясь только на столбце uuid, а не на на отметке времени, которая является уникальной.

Потом я нашел этот ответ https://stackoverflow.com/a/45543643 и подумал, что моя проблема решена. Но я, кажется, реализовал это как-то неправильно, так как государство все еще растет бесконечно и никогда не будет очищено.

У меня есть ситуация, в которой мне нужен набор данных после вызова dropDuplicates (), поэтому я добавил столбец для 15-минутного окна к исходному набору данных, затем применил метод withWatermark () к окну и добавил столбец "окна" к dropDuplicates ( ). Поскольку мне нужны данные в той же схеме, что и до применения dropDuplicates (), я затем удаляю столбец «окна» и использую соответствующий кодировщик для получения требуемого набора данных.

Это мой код:

Dataset<Context> contextDataset = ...;

contextDataset
   .withColumn("window", functions.window(contextDataset.col("timestamp"), "15 minutes"))
   .withWatermark("window", "15 minutes")
   .dropDuplicates("uuid", "window")
   .drop("window")
   .as(Context.encoder());

Моя настройка: чтение не более 350000 сообщений из темы Кафки с триггером 5 минут. Когда я использую:

contextDataset
   .withWatermark("timestamp", "15 minutes")
   .dropDuplicates("uuid", "timestamp")

микропакету требуется в основном 2,5 минуты. При использовании вышеуказанного решения с «окном» поток через некоторое время ухудшается, и микробатам требуется 2 часа со многими неудачными задачами, и он становится все хуже и хуже.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...