Мне нужно создать хранилище состояний с количеством аутентифицированных пользователей в день, чтобы я мог получить количество аутентифицированных пользователей за последний день, за последние 7 дней и за последние 30 дней. Для этого каждое событие аутентификации отправляется в событие auth-topi c. Я передаю эту топику c и создаю окно на каждый день. Код:
KStream<String, GenericRecord> authStream = builder.stream("auth-event", Consumed.with(stringSerde, valueSerde)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
.withTimestampExtractor(new TransactionTimestampExtractor()));
authStream
.groupBy(( String key, GenericRecord value) -> value.get("tenantId").toString(), Grouped.with(Serdes.String(), valueSerde))
.windowedBy(TimeWindows.of(Duration.ofDays(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("auth-result-store")
.withKeySerde(stringSerde)
.withValueSerde(longSerde))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to("auth-result-topic", Produced.with(timeWindowedSerdeFrom(String.class), Serdes.Long()));
После этого я вставляю записи в топи c. Также у меня есть контроллер покоя, и я читаю магазин, используя ReadOnlyWindowStore. Параметр дня отправляется из пользовательского интерфейса и может составлять 1, 7 или 30 дней. Это означает, что я хотел бы прочитать последние 7 windows. Код:
final ReadOnlyWindowStore<String, Long> dayStore = kafkaStreams.store(KStreamsLdapsExample.authResultTable, QueryableStoreTypes.windowStore());
Instant timeFrom = (Instant.now().minus(Duration.ofDays(days)));
LocalDate currentDate = LocalDate.now();
LocalDateTime currentDayTime = currentDate.atTime(23, 59, 59);
Instant timeTo = Instant.ofEpochSecond(currentDayTime.toEpochSecond(ZoneOffset.UTC));
try(WindowStoreIterator<Long> it1 = dayStore.fetch(tenant, timeFrom, timeTo)) {
Long count = 0L;
JsonObject jsonObject = new JsonObject();
while (it1.hasNext())
{
final KeyValue<Long, Long> next = it1.next();
Date resultDate = new Date(next.key);
jsonObject.addProperty(resultDate.toString(), next.value);
count += next.value;
}
jsonObject.addProperty("tenant", tenant);
jsonObject.addProperty("Total number of events", count);
return ResponseEntity.ok(jsonObject.toString());
}
Проблема в том, что я могу получить результаты только за 1-2 дня. После этого старые windows теряются. Другая проблема - информация, записанная в выводе topi c: «auth-result-topi c» Я читаю результаты с консолью-потребителем, и там много пустых записей, ни ключа, ни значения, и некоторые записи с некоторым случайным числом. введите описание изображения здесь
Есть идеи, что происходит с моим магазином? Как читать мимо N windows? Спасибо