Топология в вопросе:
builder.<String, String>stream(someTopic)
.filter((k, v) -> !k.equals("heartbeat"))
.filter((k, v) -> v != null)
.filter(this::isRedactedInstance)
// update our current cache of "known" redacted records
.peek(this::updateRedactedCache)
// change key so we can properly do the join further down
.selectKey(this::getKeyFromRedacted)
// now we slice the stream into 10 sec long slices and aggregate by key
// Our redacted comes in every 10 seconds, so we should never get more than one set of redacted per cluster
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)).advanceBy(Duration.ofSeconds(10)).grace(Duration.ofSeconds(3)))
.aggregate(() -> null, this::aggregateRedactedByKey)
// the suppress allows us to ignore intermediate records and only get the final 10 second worth aggregation
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek(this::deleteme)
Используемый для тестирования модульный тест:
TopologyTestDriver driver = new TopologyTestDriver(redacted.getMainTopology(), streamingProperties);
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst1),
getEntityEventString(inst1, instSeed1), 0L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst2),
getEntityEventString(inst1, instSeed1), 1L));
driver.pipeInput(
redactedRecordFactory.create(REDACTED_TOPIC,getRedactedKey(inst3),
getEntityEventString(inst3, instSeed3), Duration.ofDays(5).toMillis()));
1) Когда я запускаю тест в режиме отладки, с точкой останова на Метод «deleteMe» никогда не срабатывает.
2) Когда я добавляю точки останова в методе «агрегат», они достигаются ожидаемым образом (трижды)
3) Если я выполняю шаг через достаточно медленно, я делаю попал в точку останова в методе "deleteMe".
Я попытался перевести блокировку стены, но я понимаю, что это не имеет отношения к подавлению окна (и это не так кроме работы).
Я не уверен, что еще можно попробовать - я ожидал, что третье событие, с очень длинной меткой времени, вызовет подавление.