Я попытался ввести дедупликацию событий на основе уникального ключа с помощью преобразования Distinct.withRepresentativeValue
.Кажется, все работает нормально (например, данные записываются приемником), но system lag
растет, а watermark
застревает на месте.
Что я делаю не так?Как настроить управление окнами так, чтобы оно начинало прогрессировать?
Конвейер использует источник PubSubIO с настройкой временной метки по умолчанию (я думаю, это сообщение publishTime
по умолчанию).После некоторой начальной обработки события дедуплицируются и записываются в BigQuery и другие выходные темы PubSub.
Я хотел бы настроить дедупликацию так, чтобы она выводила одно событие на уникальный ключ, например, в 15-минутном окне с запуском по водяному знаку, чтобы избежать потери данных.Проблема в том, что Distinct
преобразование водяного знака не происходит вообще, а системное отставание постоянно увеличивается.
Дедупликация PTransform
выглядит так:
override fun expand(input: PCollection<EventRecord>): PCollection<EventRecord> {
val duration = Duration.standardMinutes(15)
return input
.apply("Window", Window.into<EventRecord>(FixedWindows.of(duration))
.triggering(AfterWatermark.pastEndOfWindow()))
.apply("Distinct", Distinct.withRepresentativeValueFn(ExtractHashFn()))
}
Снимки экрана с выполнением задания можно найти здесь