Сообщения, отбрасываемые из-за истекших окон, даже если для этого конкретного ключа окно не должно закрываться
Я хочу сгруппировать сообщения, полученные из одной темы раздела, и окном этих сообщений на 30 секунд в зависимости от времени события. Чтобы избежать немедленной обработки, я вызываю метод подавления, а также использую метод .grace. Как только окна закроются (через 30 секунд + льготный период 0), я ожидаю, что окончательный результат будет добавлен в тему. Сообщения, которые я использую в этой теме, имеют два разных ключа: 300483976 и 300485339. Сообщения, которые я потребляю, увеличивают время события на 10 секунд. Я читал, что время потока увеличивается только на основе новых сообщений, которые увеличивают время события. Это тоже то, что я испытываю. Однако проблема, которую я вижу, заключается в следующем:
Я использую первые 10 сообщений для ключа 300483976. Основываясь на методе "KStreamWindowAggregate.process", я замечаю, что internalProcessorContext.streamTime () увеличивается каждый раз, основываясь на последнем использованном сообщении. После обработки 10 сообщений конечное время события теперь равно времени начала + 300 секунд. После этого сообщения для ключа 300485339 израсходованы. Все, но последние сообщения помечаются как просроченные и отбрасываются сообщением «Пропуск записи для просроченного окна». Кажется, что internalProcessorContext.streamTime () все еще запоминает последнее значение первого запуска и поэтому отбрасывает сообщения с ключом 300485339.
stream
.groupByKey(Grouped.with(Serdes.String(), new DataSerde()))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(30))
.grace(Duration.ofMillis(0))) // override the default of 24 hours
.aggregate(Data::new, transform(), materialize())
.filter((key, value) -> {
log.info("agg {} {}", key, value.toString());
return true;
})
.suppress(
Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
Я ожидаю, что, поскольку сообщения сгруппированы по ключу (300483976 и 300485339), время потока не будет "общим". Я ожидаю, что будут отдельные окна для ключа 300483976 и ключа 300485339. Есть идеи, что не так? Я использую kafka-streams 2.1.0 и timestampextractor, который получает время события из поля в сообщении.
UPDATE
Я провел дополнительное тестирование и адаптировал пример, который не использует агрегат, но показывает ту же проблему со временем потока:
@Test
public void shouldSupportFinalResultsForTimeWindows() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
.windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
final ConsumerRecordFactory<String, String> recordFactory =
new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k2", "v1", 7L));
// note this last records sets the streamtime to 7L causing the next messages to be discarded
driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
}
}
В приведенном выше примере второе сообщение устанавливает время потока равным 7L, в результате чего создаваемое окно от 0 до 2 закрывается, даже если сообщение имеет другой ключ. Это также приводит к тому, что следующая пара сообщений будет отброшена, даже если есть ключ k1. Таким образом, из этого примера становится ясно, что ключи не учитываются. Если это на самом деле так, как задумано, мне интересно, каков сценарий для этого. Особенно, когда я думаю, что довольно часто в теме есть сообщения с разными разделами, и у одного раздела могут быть совершенно разные сообщения с временами потока (возникшими из времени события) от других разделов. Надеюсь, вы можете пролить свет на это ??