Spark: как очищаются состояния при дедупликации при указании водяных знаков? - PullRequest
0 голосов
/ 31 марта 2020

Мы добавили приведенный ниже код для удаления дубликатов с использованием водяных знаков

stream_df = stream_df \
               .withWatermark('postedtimets', "3600 seconds") \
               .dropDuplicates(subset=['id','postedtimets'])

Функционально код корректно удаляет дубликаты, но после нескольких дней работы мы заметили значительное замедление работы приложения.

Глядя на вкладку SQL в пользовательском интерфейсе spark, мы замечаем, что состояние не очищается, оно считывает более 1 миллиона записей состояния, хотя число записей за последний 1 час будет 20k

* 1007. * streamingDeduplicate количество строк общего состояния: 1 274 619 запоминающих элементов, использованных общим количеством состояний (мин., Мед., Макс.): 329,5 МБ (764,5 КБ, 1690,6 КБ, 2,3 МБ) выходных строк: 59 примерный размер состояния только для текущей версии всего (мин., мед., макс.): 270,6 МБ (574,6 КБ, 1388,5 КБ, 1980,5 КБ). med, max): 1,6 с (5 мс, 7 мс, 68 мс) общее время для удаления строк общее (min, med, max): 1 мс (0 мс, 0 мс, 1 мс) число обновлений Всего строк состояния: 59. Общее время обновления строк (мин., мед., макс.): 4,1 с (13 мс, 19 мс, 88 мс

. Может кто-нибудь пролить свет на то, как старые состояния могли очиститься?

...