Apache Beam TestStream finalPane не работает, как ожидалось - PullRequest
0 голосов
/ 14 мая 2019

Я нахожусь в процессе написания простого теста для проверки семантики ранних / своевременных / поздних панелей.Конвейер объединяет количество элементов на ключ.Мои ранние и своевременные панели работают должным образом, хотя моя последняя панель, кажется, всегда пуста.

private static final Duration WINDOW_LENGTH = Duration.standardMinutes(2);
private static final Duration LATENESS_HORIZON = Duration.standardDays(1);

Мой тест выглядит следующим образом:

@Test
@Category(ValidatesRunner.class)
public void simpleTest() throws Exception {
    Instant baseTime = new Instant(0L);
    Duration one_min = Duration.standardMinutes(1);


    TestStream<KV<String, Long>> events = TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))
            .advanceWatermarkTo(baseTime)

            // First element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Second element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Third element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Window ends
            .advanceWatermarkTo(baseTime.plus(WINDOW_LENGTH).plus(one_min))

            // Late element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Fire all
            .advanceWatermarkToInfinity();

    PCollection<KV<String, Long>> userCount = p.apply(events).apply(new CountPipeline());

    IntervalWindow window = new IntervalWindow(baseTime, WINDOW_LENGTH);

    PAssert.that(userCount)  // This test works
            .inEarlyPane(window)
            .containsInAnyOrder(
                KV.of("laurens", 1L),  // First firing
                KV.of("laurens", 2L),  // Second firing
                KV.of("laurens", 3L)   // Third firing
            );

    PAssert.that(userCount) // This test works as well
            .inOnTimePane(window)
            .containsInAnyOrder(
                    KV.of("laurens", 3L) // On time firing
            );

    PAssert.that(userCount) // Test fails
            .inFinalPane(window)
            .containsInAnyOrder(
                    KV.of("laurens", 4L) // Late firing
            );

    p.run().waitUntilFinish();
}

Код конвейера выглядит следующим образом:

public static class CountPipeline extends PTransform<PCollection<KV<String, Long>>, PCollection<KV<String, Long>>> {

    @Override
    public PCollection<KV<String, Long>> expand(PCollection<KV<String, Long>> events) {
        return events.apply("window", Window.<KV<String, Long>>into(FixedWindows.of(WINDOW_LENGTH))
                        .triggering(AfterWatermark
                                .pastEndOfWindow()
                                .withEarlyFirings(AfterProcessingTime
                                        .pastFirstElementInPane())
                                .withLateFirings(AfterProcessingTime
                                        .pastFirstElementInPane())
                        )
                        .withAllowedLateness(LATENESS_HORIZON)
                        .accumulatingFiredPanes()
                        .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
                ).apply("Count", Count.perKey());
    }
}

Ошибка:

Expected: iterable over [<KV{laurens, 4}>] in any order
     but: No item matches: <KV{laurens, 4}> in []

Как видите, последний элемент определенно попадает после водяного знака, что по определению должно привести к задержке.Тем не менее, последняя панель не содержит уточнения исходного результата.Я, честно говоря, не понимаю, почему не выпускается поздняя панель.Любые идеи приветствуются.

1 Ответ

2 голосов
/ 16 мая 2019

FinalPane отличается от LatePane.

FinalPane, как ожидается, будет пустым в вашем тестировании, потому что ваш тестовый сценарий запускает триггер для каждого элемента, таким образом, остается NO ONE , чтобы быть в FinalPane.

Ваше намерение, насколько я могу судить по комментариям, является правильным, чтобы проверить на LatePane. По неизвестной причине этот конкретный случай LatePane отсутствует в списке утилит PAssert. Я сделал пиар, чтобы это исправить: https://github.com/apache/beam/pull/8587

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...