Использование таймера и бокового ввода на ParDo в Apache Beam - PullRequest
0 голосов
/ 05 октября 2018

Я пытаюсь написать ParDo, который будет использовать таймер и боковой ввод, но он вылетает, когда я пытаюсь запустить его с beam-runners-direct-java с IllegalArgumentException в строке https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java#L167,, потому что естьфактически два входа в ParDo (основной набор PC и боковой вход), хотя ожидается только один.

Есть ли способ обойти это?Это ошибка в Beam?

Вот фрагмент кода, который воспроизводит это поведение:

public class TestCrashesForTimerAndSideInput {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @RequiredArgsConstructor
    private static class DoFnWithTimer extends DoFn<KV<String, String>, String> {
        private final PCollectionView<Map<String, String>> sideInput;
        @TimerId("t")
        private final TimerSpec tSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @ProcessElement
        public void processElement(ProcessContext c, @TimerId("t") Timer t) {
            KV<String, String> element = c.element();
            c.output(element.getKey() + c.sideInput(sideInput).get(element));
            t.offset(Duration.standardSeconds(1)).setRelative();
        }

        @OnTimer("t")
        public void onTimerFire(OnTimerContext x) {
            x.output("Timer fired");
        }
    }

    @Test
    public void testCrashesForTimerAndSideInput() {
        ImmutableMap<String, String> sideData = ImmutableMap.<String, String>builder().
                put("x", "X").
                put("y", "Y").
                build();

        PCollectionView<Map<String, String>> sideInput =
                p.apply(Create.of(sideData)).apply(View.asMap());

        TestStream<String> testStream = TestStream.create(StringUtf8Coder.of()).
                addElements("x").
                advanceProcessingTime(Duration.standardSeconds(1)).
                addElements("y").
                advanceProcessingTime(Duration.standardSeconds(1)).
                advanceWatermarkToInfinity();

        PCollection<String> result = p.
                apply(testStream).
                apply(MapElements.into(kvs(strings(), strings())).via(v -> KV.of(v, v))).
                apply(ParDo.of(new DoFnWithTimer(sideInput)).withSideInputs(sideInput));

        PAssert.that(result).containsInAnyOrder("X", "Y", "Timer fired");
        p.run();
    }

}

и исключение:

java.lang.IllegalArgumentException: expected one element but was: <ParDo(DoFnWithTimer)/ParMultiDo(DoFnWithTimer)/To KeyedWorkItem/ParMultiDo(ToKeyedWorkItem).output [PCollection], View.AsMap/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization).output [PCollection]>

    at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322)
    at org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
    at org.apache.beam.runners.direct.QuiescenceDriver.fireTimers(QuiescenceDriver.java:167)
    at org.apache.beam.runners.direct.QuiescenceDriver.drive(QuiescenceDriver.java:110)
    at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$2.run(ExecutorServiceParallelExecutor.java:170)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
...