Я пытаюсь объединить 2 неограниченных источника, используя Apache Beam Java SDK. При присоединении я получаю сообщение об ошибке ниже.
Исключение в потоке "main" java.lang.UnsupportedOperationException: присоединение неограниченно
PCollections в настоящее время поддерживается только для неглобальных окон с
триггеры, которые, как известно, производят вывод один раз за окно, например
триггер по умолчанию с нулевым разрешенным запозданием. В этих случаях Beam может
гарантировать, что он объединяет все элементы ввода один раз в окно.
WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117,
allowedLateness = PT0S,
Триггер = Repeatedly.forever (AfterProcessingTime.pastFirstElementInPane (). plusDelayOf (1
минут)), способ накопления = DISCARDING_FIRED_PANES,
timestampCombiner = EARLIEST} не поддерживается
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger (BeamJoinRel.java:341)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.access $ 1500 (BeamJoinRel.java:98)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel $ StandardJoin.expand (BeamJoinRel.java:330)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel $ StandardJoin.expand (BeamJoinRel.java:308)
в org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
в org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:488)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection (BeamSqlRelUtils.java:67)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda $ buildPCollectionList $ 0 (BeamSqlRelUtils.java:48)
на java.util.stream.ReferencePipeline $ 3 $ 1.accept (ReferencePipeline.java:193)
в java.util.Iterator.forEachRemaining (Iterator.java:116)
в java.util.Spliterators $ IteratorSpliterator.forEachRemaining (Spliterators.java:1801)
в java.util.stream.AbstractPipeline.copyInto (AbstractPipeline.java:481)
в java.util.stream.AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:471)
at java.util.stream.ReduceOps $ ReduceOp.evaluateSequential (ReduceOps.java:708)
в java.util.stream.AbstractPipeline.evaluate (AbstractPipeline.java:234)
в java.util.stream.ReferencePipeline.collect (ReferencePipeline.java:499)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList (BeamSqlRelUtils.java:49)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection (BeamSqlRelUtils.java:65)
в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection (BeamSqlRelUtils.java:36)
в org.apache.beam.sdk.extensions.sql.SqlTransform.expand (SqlTransform.java:100)
в org.apache.beam.sdk.extensions.sql.SqlTransform.expand (SqlTransform.java:76)
в org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
в org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:488)
в org.apache.beam.sdk.values.PCollectionTuple.apply (PCollectionTuple.java:167)
at xyz.xyz.main (xyz.java:64)
Я пытался использовать как фиксированное, так и скользящее окно вместе с триггером (pastEndOfWindow & pastFirstElementInPane) с нулевым допустимым запаздыванием. Опробовал Accumalate & Discard. Я получаю одно и то же сообщение об ошибке каждый раз.
Ниже приведены 2 фрагмента, которые я пробовал использовать как фиксированное, так и скользящее окно.
p1.apply("window",
Window
.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
p1.apply("window2",
Window.<Row>into(
SlidingWindows
.of(Duration.standardSeconds(30))
.every(Duration.standardSeconds(5)))
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
Я просто хотел реализовать преобразование sql со скользящим окном, Trigger с задержкой и разрешением задержки. Пожалуйста, проведите меня до конца.
Спасибо,
Gowtham