Водяной знак не продвигается при использовании отличительного преобразования - PullRequest
0 голосов
/ 25 апреля 2019

Я попытался ввести дедупликацию событий на основе уникального ключа с помощью преобразования Distinct.withRepresentativeValue.Кажется, все работает нормально (например, данные записываются приемником), но system lag растет, а watermark застревает на месте.

Что я делаю не так?Как настроить управление окнами так, чтобы оно начинало прогрессировать?

Конвейер использует источник PubSubIO с настройкой временной метки по умолчанию (я думаю, это сообщение publishTime по умолчанию).После некоторой начальной обработки события дедуплицируются и записываются в BigQuery и другие выходные темы PubSub.

Я хотел бы настроить дедупликацию так, чтобы она выводила одно событие на уникальный ключ, например, в 15-минутном окне с запуском по водяному знаку, чтобы избежать потери данных.Проблема в том, что Distinct преобразование водяного знака не происходит вообще, а системное отставание постоянно увеличивается.

Дедупликация PTransform выглядит так:

override fun expand(input: PCollection<EventRecord>): PCollection<EventRecord> {
  val duration = Duration.standardMinutes(15)
  return input
    .apply("Window", Window.into<EventRecord>(FixedWindows.of(duration))
    .triggering(AfterWatermark.pastEndOfWindow()))
    .apply("Distinct", Distinct.withRepresentativeValueFn(ExtractHashFn()))
}

Снимки экрана с выполнением задания можно найти здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...