Apache beam IllegalArgumentException: небезопасный триггер может потерять данные - PullRequest
1 голос
/ 07 мая 2020

Со следующей функцией окна,

Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardDays(1)))
        .triggering(
                AfterWatermark.pastEndOfWindow()
                    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(30)))
        )
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(20), Window.ClosingBehavior.FIRE_IF_NON_EMPTY));

Мы встречаем ошибку ниже под Beam 2.20.0

java.lang.IllegalArgumentException: Unsafe trigger may lose data, see https://s.apache.org/finishing-triggers-drop-data: AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 hour))
    at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:171)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:226)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:110)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:355)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1596)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1485)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
    at com.beam.test.monitorAdsUnit$CaculateUnitAbnormalECPM.expand(monitorAdsUnit.java:153)
    at com.beam.test.monitorAdsUnit$CaculateUnitAbnormalECPM.expand(monitorAdsUnit.java:149)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
    at com.beam.test.monitorAdsUnit.main(monitorAdsUnit.java:119)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

И мы нашли решение для исправления по ссылке выше

Исправление состоит в том, что для преобразования GroupByKey верхнего уровня запрещены триггеры, которые завершают sh.

Может ли кто-нибудь дать нам более ясное объяснение этого?

В настоящее время наш обходной путь заключается в неявном добавлении Repeatedly.forever(...) к триггерам верхнего уровня.

1 Ответ

2 голосов
/ 08 мая 2020

Ваш триггер AfterWatermark.withEarlyFirings(...) сработает в последний раз, когда водяной знак достигнет конца окна, а затем сбросит все последующие данные. Такие триггеры отключены из-за почти уверенности в том, что это потеря данных.

В вашем случае вы устанавливаете допустимую задержку через .withAllowedLateness(<20 minutes>). Я предполагаю, что вам нужен какой-то вывод, включающий эти 20 минут данных. Но его всегда отбрасывали. Допустимая задержка не действует, поскольку триггер отбрасывает данные.

Ваш обходной путь - правильное изменение. Это приведет к немедленной выдаче запаздывающих данных. Для лучшей читабельности я рекомендую почти всегда использовать триггер, например:

AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(...)
    .withLateFirings(...)

Это четко очерчивает «жизненный цикл» агрегирования: у вас есть одно поведение для ранних / предполагаемых результатов, одно - «готово». вывод, а затем одно поведение для результатов поздних / исправлений.

Ниже по потоку вы можете наблюдать PaneInfo, чтобы настроить обработку для трех вышеуказанных случаев.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...