Невозможно принудительно отключить окно при использовании TopologyTestDriver - PullRequest
2 голосов
/ 06 апреля 2020

Топология в вопросе:

builder.<String, String>stream(someTopic)
            .filter((k, v) -> !k.equals("heartbeat"))
            .filter((k, v) -> v != null)
            .filter(this::isRedactedInstance)
            // update our current cache of "known" redacted records
            .peek(this::updateRedactedCache)
            // change key so we can properly do the join further down
            .selectKey(this::getKeyFromRedacted)
            // now we slice the stream into 10 sec long slices and aggregate by key
            // Our redacted comes in every 10 seconds, so we should never get more than one set of redacted per cluster
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
            .aggregate(() -> null, this::aggregateRedactedByKey)
            // the suppress allows us to ignore intermediate records and only get the final 10 second worth aggregation
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream()
            .peek(this::deleteme)

Используемый для тестирования модульный тест:

TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst1), 
    getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst2), 
    getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
    redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst3), 
    getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));

1) Когда я запускаю тест в режиме отладки, с точкой останова на Метод «deleteMe» никогда не срабатывает.

2) Когда я добавляю точки останова в методе «агрегат», они достигаются ожидаемым образом (трижды)

3) Если я выполняю шаг через достаточно медленно, я делаю попал в точку останова в методе "deleteMe".

Я попытался перевести блокировку стены, но я понимаю, что это не имеет отношения к подавлению окна (и это не так кроме работы).

Я не уверен, что еще можно попробовать - я ожидал, что третье событие, с очень длинной меткой времени, вызовет подавление.

...