Как исправить «Объединение неограниченных PCollections в настоящее время поддерживается только для неглобальных окон с триггерами» в Apache Beam - PullRequest
1 голос
/ 08 июля 2019

Я пытаюсь объединить 2 неограниченных источника, используя Apache Beam Java SDK. При присоединении я получаю сообщение об ошибке ниже.

Исключение в потоке "main" java.lang.UnsupportedOperationException: присоединение неограниченно PCollections в настоящее время поддерживается только для неглобальных окон с триггеры, которые, как известно, производят вывод один раз за окно, например триггер по умолчанию с нулевым разрешенным запозданием. В этих случаях Beam может гарантировать, что он объединяет все элементы ввода один раз в окно. WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness = PT0S, Триггер = Repeatedly.forever (AfterProcessingTime.pastFirstElementInPane (). plusDelayOf (1 минут)), способ накопления = DISCARDING_FIRED_PANES, timestampCombiner = EARLIEST} не поддерживается в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger (BeamJoinRel.java:341) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.access $ 1500 (BeamJoinRel.java:98) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel $ StandardJoin.expand (BeamJoinRel.java:330) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel $ StandardJoin.expand (BeamJoinRel.java:308) в org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537) в org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:488) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection (BeamSqlRelUtils.java:67) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda $ buildPCollectionList $ 0 (BeamSqlRelUtils.java:48) на java.util.stream.ReferencePipeline $ 3 $ 1.accept (ReferencePipeline.java:193) в java.util.Iterator.forEachRemaining (Iterator.java:116) в java.util.Spliterators $ IteratorSpliterator.forEachRemaining (Spliterators.java:1801) в java.util.stream.AbstractPipeline.copyInto (AbstractPipeline.java:481) в java.util.stream.AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:471) at java.util.stream.ReduceOps $ ReduceOp.evaluateSequential (ReduceOps.java:708) в java.util.stream.AbstractPipeline.evaluate (AbstractPipeline.java:234) в java.util.stream.ReferencePipeline.collect (ReferencePipeline.java:499) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList (BeamSqlRelUtils.java:49) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection (BeamSqlRelUtils.java:65) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection (BeamSqlRelUtils.java:36) в org.apache.beam.sdk.extensions.sql.SqlTransform.expand (SqlTransform.java:100) в org.apache.beam.sdk.extensions.sql.SqlTransform.expand (SqlTransform.java:76) в org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537) в org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:488) в org.apache.beam.sdk.values.PCollectionTuple.apply (PCollectionTuple.java:167) at xyz.xyz.main (xyz.java:64)

Я пытался использовать как фиксированное, так и скользящее окно вместе с триггером (pastEndOfWindow & pastFirstElementInPane) с нулевым допустимым запаздыванием. Опробовал Accumalate & Discard. Я получаю одно и то же сообщение об ошибке каждый раз.

Ниже приведены 2 фрагмента, которые я пробовал использовать как фиксированное, так и скользящее окно.

p1.apply("window",
    Window
      .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .withAllowedLateness(Duration.ZERO)
      .accumulatingFiredPanes());
p1.apply("window2",
    Window.<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(5)))
      .triggering(
        Repeatedly
          .forever(
             AfterProcessingTime
               .pastFirstElementInPane()
               .plusDelayOf(Duration.standardMinutes(1))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

Я просто хотел реализовать преобразование sql со скользящим окном, Trigger с задержкой и разрешением задержки. Пожалуйста, проведите меня до конца.

Спасибо, Gowtham

Ответы [ 2 ]

2 голосов
/ 08 июля 2019

До сих пор (2.13.0) BeamSQL не поддерживал неограниченное объединение неограниченных наборов PC с нестандартными триггерами. Для таких объединений разрешен только триггер по умолчанию (поэтому в каждом окне будет только один результат).

Основная причина заключается в том, что в текущей реализации Beam Java SDK отсутствует механизм (который называется втягиванием и накоплением) для уточнения данных в таких случаях, как Join.

1 голос
/ 10 июля 2019

Из комментария, если я правильно понимаю, желаемое поведение:

  • объединить два потока;
  • отправлять результаты каждые 30 секунд в реальном времени;
  • если данные не могут быть сопоставлены, подождите соответствующую соответствующую запись в течение 30 минут максимум;
  • сбросить записи через 30 минут;

В основном это вид непрерывного скользящего сопоставления последних 30 минут данных в обоих потоках, и результаты выдаются каждые 30 секунд.

Хорошей новостью является то, что должна быть возможность реализовать в Beam Java (возможно, также и в Python). Плохая новость, что, вероятно, было бы нетривиально в Java, и я не думаю, что сейчас это вообще возможно в SQL.

Как бы это выглядело:

  • ввод должен быть в глобальном окне;
  • имейте состояние ParDo (или this ), которое отслеживает все видимые элементы, сохраняя их в ячейке состояния:
    • вам, вероятно, потребуется использовать боковой вход или заранее применить CoGroupByKey, чтобы иметь доступ к элементам с обоих входов в одном и том же ParDo;
    • боковые входы и CoGroupByKey имеют различную семантику и с ними может быть нелегко работать;
  • на каждом входе вручную проверять состояние для соответствующих записей;
  • либо сразу генерировать результаты, либо хранить их в другой ячейке состояния;
  • имеет таймер , который удаляет старые несопоставленные записи:
    • вам может понадобиться вручную отслеживать метки времени и другие вещи;
  • применить желаемое окно / триггер к выходу при необходимости;

Я предлагаю вам прочитать этот пример , он делает таймер и устанавливает часть того, что вам нужно (он ожидает сопоставления записей, сохраняет несопоставленные записи в состоянии и очищает состояние при срабатывании таймера) и использует CoGroupByKey. Возможно, вы поймете, как это работает, после того, как поймете этот пример.

...