Как использовать временное окно Кафки для исторической агрегации? - PullRequest
0 голосов
/ 03 апреля 2020

Мне нужно создать хранилище состояний с количеством аутентифицированных пользователей в день, чтобы я мог получить количество аутентифицированных пользователей за последний день, за последние 7 дней и за последние 30 дней. Для этого каждое событие аутентификации отправляется в событие auth-topi c. Я передаю эту топику c и создаю окно на каждый день. Код:

KStream<String, GenericRecord> authStream = builder.stream("auth-event", Consumed.with(stringSerde, valueSerde)
            .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST)
            .withTimestampExtractor(new TransactionTimestampExtractor()));

        authStream 
                .groupBy(( String key, GenericRecord value) -> value.get("tenantId").toString(), Grouped.with(Serdes.String(), valueSerde))
                .windowedBy(TimeWindows.of(Duration.ofDays(1)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("auth-result-store")
                        .withKeySerde(stringSerde)
                        .withValueSerde(longSerde))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .to("auth-result-topic", Produced.with(timeWindowedSerdeFrom(String.class), Serdes.Long()));

После этого я вставляю записи в топи c. Также у меня есть контроллер покоя, и я читаю магазин, используя ReadOnlyWindowStore. Параметр дня отправляется из пользовательского интерфейса и может составлять 1, 7 или 30 дней. Это означает, что я хотел бы прочитать последние 7 windows. Код:

final ReadOnlyWindowStore<String, Long> dayStore = kafkaStreams.store(KStreamsLdapsExample.authResultTable, QueryableStoreTypes.windowStore());

        Instant timeFrom = (Instant.now().minus(Duration.ofDays(days)));

        LocalDate currentDate = LocalDate.now();
        LocalDateTime currentDayTime = currentDate.atTime(23, 59, 59);
        Instant timeTo = Instant.ofEpochSecond(currentDayTime.toEpochSecond(ZoneOffset.UTC));

        try(WindowStoreIterator<Long> it1 = dayStore.fetch(tenant, timeFrom, timeTo)) {
            Long count = 0L;
            JsonObject jsonObject = new JsonObject();
            while (it1.hasNext())
            {
                final KeyValue<Long, Long> next = it1.next();
                Date resultDate = new Date(next.key);
                jsonObject.addProperty(resultDate.toString(), next.value);
                count += next.value;
            }

            jsonObject.addProperty("tenant", tenant);
            jsonObject.addProperty("Total number of events", count);

            return ResponseEntity.ok(jsonObject.toString());
        }

Проблема в том, что я могу получить результаты только за 1-2 дня. После этого старые windows теряются. Другая проблема - информация, записанная в выводе topi c: «auth-result-topi c» Я читаю результаты с консолью-потребителем, и там много пустых записей, ни ключа, ни значения, и некоторые записи с некоторым случайным числом. введите описание изображения здесь

Есть идеи, что происходит с моим магазином? Как читать мимо N windows? Спасибо

1 Ответ

0 голосов
/ 04 апреля 2020

Вам нужно будет увеличить время хранения в магазине (по умолчанию 1 день) через Materialize.as(...).withRetention(...), который вы можете передать оператору count().

Вы также можете увеличить период отсрочки в окне через TimeWindows.of(Duration.ofDays(1)).grace(...).

Для чтения данных с консоли потребителя: вам нужно будет указать правильный десериализатор. Window-serde и long-serde, которые вы используете для записи в выходные данные topi c, используют двоичные форматы, в то время как консольный потребитель по умолчанию принимает строковый тип данных. Существуют соответствующие параметры командной строки, которые вы можете указать для установки различных десериализаторов ключей и значений, которые должны соответствовать сериализаторам, которые вы используете при записи в topi c.

...