В чем разница между триггером afterwatermark и триггером по умолчанию в Apache Beam? - PullRequest
0 голосов
/ 20 сентября 2019

Согласно документации Apache Beam

Триггер AfterWatermark работает во время события.Триггер AfterWatermark выдает содержимое окна после того, как водяной знак проходит через конец окна, основываясь на временных метках, прикрепленных к элементам данных.Водяной знак является глобальной метрикой прогресса и представляет собой представление Beam о полноте ввода в вашем конвейере в любой заданной точке.AfterWatermark срабатывает только тогда, когда водяной знак проходит через конец окна.

Триггер по умолчанию для PCollection основан на времени события и генерирует результаты окна, когда водяной знак Beam проходит через конец окна, изатем срабатывает каждый раз, когда поступают поздние данные.Однако, если вы используете как конфигурацию окон по умолчанию, так и триггер по умолчанию, триггер по умолчанию выдает ровно один раз, а поздние данные отбрасываются.

Я попытался реализовать их оба, и я получил аналогичные выходные данныеиспользуя фиксированное окно.

с триггером Afterwatermark:

lines |'timestamp' >> beam.Map(get_timestamp)
           | 'window' >> beam.WindowInto(
            window.FixedWindows(20),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.DISCARDING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
        | 'printnbrarticles' >> beam.ParDo(PrintFn())
        | 'jsondumps' >> beam.Map(lambda x: json.dumps(x))
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | 'send_to_Pub/Sub' >> beam.io.WriteToPubSub(known_args.out_topic)
    )

с триггером по умолчанию:

lines |'timestamp' >> beam.Map(get_timestamp)
           | 'window' >> beam.WindowInto(
            window.FixedWindows(20),
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
        | 'printnbrarticles' >> beam.ParDo(PrintFn())
        | 'jsondumps' >> beam.Map(lambda x: json.dumps(x))
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | 'send_to_Pub/Sub' >> beam.io.WriteToPubSub(known_args.out_topic)
    )

1 Ответ

1 голос
/ 20 сентября 2019

Вы не видите разницы, потому что у вас нет поздних данных.Как описано в документации, Afterwatermark запускается только один раз после водяного знака, по умолчанию каждый триггер вызывается после водяного знака AND .

Кроме того, с триггером Afterwatermark выМожно настроить дополнительное поведение (и вызов) в случае ранних данных (данные поступают до открытия окна) или поздних данных (поступают после водяного знака).

Нельзя настроить это с помощью триггера по умолчанию.

...