Мы добавили приведенный ниже код для удаления дубликатов с использованием водяных знаков
stream_df = stream_df \
.withWatermark('postedtimets', "3600 seconds") \
.dropDuplicates(subset=['id','postedtimets'])
Функционально код корректно удаляет дубликаты, но после нескольких дней работы мы заметили значительное замедление работы приложения.
Глядя на вкладку SQL в пользовательском интерфейсе spark, мы замечаем, что состояние не очищается, оно считывает более 1 миллиона записей состояния, хотя число записей за последний 1 час будет 20k
* 1007. *
streamingDeduplicate количество строк общего состояния: 1 274 619 запоминающих элементов, использованных общим количеством состояний (мин., Мед., Макс.): 329,5 МБ (764,5 КБ, 1690,6 КБ, 2,3 МБ) выходных строк: 59 примерный размер состояния только для текущей версии всего (мин., мед., макс.): 270,6 МБ (574,6 КБ, 1388,5 КБ, 1980,5 КБ). med, max): 1,6 с (5 мс, 7 мс, 68 мс) общее время для удаления строк общее (min, med, max): 1 мс (0 мс, 0 мс, 1 мс) число обновлений Всего строк состояния: 59. Общее время обновления строк (мин., мед., макс.): 4,1 с (13 мс, 19 мс, 88 мс . Может кто-нибудь пролить свет на то, как старые состояния могли очиститься?