Я использую метод dropDuplicates () в Spark Structured Streaming 2.2.1 и мне нужно рабочее решение для использования withWatermark () для уменьшения состояния.
Моя первая попытка добиться этого была неверной, поскольку я не использовал столбец отметки времени, определенный в withWatermark (), для удаления дубликатов, потому что на самом деле я хочу удалить дубликаты, основываясь только на столбце uuid, а не на на отметке времени, которая является уникальной.
Потом я нашел этот ответ https://stackoverflow.com/a/45543643 и подумал, что моя проблема решена. Но я, кажется, реализовал это как-то неправильно, так как государство все еще растет бесконечно и никогда не будет очищено.
У меня есть ситуация, в которой мне нужен набор данных после вызова dropDuplicates (), поэтому я добавил столбец для 15-минутного окна к исходному набору данных, затем применил метод withWatermark () к окну и добавил столбец "окна" к dropDuplicates ( ). Поскольку мне нужны данные в той же схеме, что и до применения dropDuplicates (), я затем удаляю столбец «окна» и использую соответствующий кодировщик для получения требуемого набора данных.
Это мой код:
Dataset<Context> contextDataset = ...;
contextDataset
.withColumn("window", functions.window(contextDataset.col("timestamp"), "15 minutes"))
.withWatermark("window", "15 minutes")
.dropDuplicates("uuid", "window")
.drop("window")
.as(Context.encoder());
Моя настройка: чтение не более 350000 сообщений из темы Кафки с триггером 5 минут. Когда я использую:
contextDataset
.withWatermark("timestamp", "15 minutes")
.dropDuplicates("uuid", "timestamp")
микропакету требуется в основном 2,5 минуты.
При использовании вышеуказанного решения с «окном» поток через некоторое время ухудшается, и микробатам требуется 2 часа со многими неудачными задачами, и он становится все хуже и хуже.