Водяной знак PubSub не продвигается - PullRequest
0 голосов
/ 05 июня 2018

Я написал задание Apache Beam с использованием Scio с целью генерации идентификаторов сеансов для входящих записей данных, а затем каким-то образом их обогащать, прежде чем выводить их в BigQuery.Вот код:

val measurements = sc.customInput("ReadFromPubsub",
  PubsubIO
    .readMessagesWithAttributes()
    .withTimestampAttribute("ts")
    .fromSubscription(subscription)
)

measurements
    .map(extractMeasurement).flatMap {
      case Success(event) =>
        Some(event)
      case Failure(ex) =>
        None
    }
    .timestampBy(_.timestamp)
    .withSessionWindows(sessionGap, WindowOptions(
      trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
      accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
      allowedLateness = Duration.standardDays(1),
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
    .keyBy(_.clientID)
    .groupByKey
    .toWindowed
    .map(assignSessionID)
    .toSCollection.flatMap(_.results)
    .map(enrich)
    .saveAsTypedBigQuery(output, bigquery.WRITE_APPEND, bigquery.CREATE_NEVER)

Я использую метку времени события, которая является значением ключа атрибута ts в PubsubMessage, в качестве моего атрибута метки времени.Это та же самая временная метка, что и в .timestampBy, которую я использую, до того, как мои данные будут обработаны.Я ожидаю, что выходной триггер сработает, как только водяной знак пройдет мимо sessionGap (по умолчанию 30 минут).

При использовании как Dataflow runner, так и DirectRunner триггер никогда не срабатывает, даже если я моделирую данные с помощьювременные метки с интервалом более 30 минут.В пользовательском интерфейсе потока данных я вижу, что водяной знак никогда не продвигается на основе отметок времени события, а только каждую минуту, как будто данные не были получены.

Я проверил, что данные действительно были получены, так как выполняется преобразование перед оконным управлением.Я также протестировал с около 10 записей в секунду , но, возможно, этого все еще недостаточно для обновления водяного знака?Я также настроил JobTest, в котором я получаю ожидаемый результат, а также сигнализирую мне, что проблема основана на отметке времени / водяного знака.

Я уверен, что пропустил что-то важное в документации или где-то допустил глупую ошибку и надеялся, что кто-то может указать мне правильное направление.

1 Ответ

0 голосов
/ 29 июня 2018

Вы можете попробовать добавить раннее и позднее срабатывание в AfterWatermark.pastEndofWindows, чтобы увидеть, обновляется ли водяной знак, а также проверить наличие данных, поступающих с опозданием.Также вы можете найти документацию по триггерам здесь .

...