Как можно избежать создания журналов изменений в потоках Kafka? - PullRequest
0 голосов
/ 09 июля 2019

Я пытаюсь избежать создания разделов журнала изменений в потоках Kafka, используя inMemoryWindowStore (я использую Kafka 2.3.0 и Streams DSL), я также вызываю withLoggingDisabled(), но каким-то образом, когда приложение запускает разделы журнала изменений,создал, а также использовал, потому что я могу видеть данные в них.Что я делаю неправильно?Как я могу избежать создания журналов изменений?

    WindowBytesStoreSupplier storeSupplier = Stores.inMemoryWindowStore("in-mem-store-" + index,
            Duration.ofSeconds(windowRetentionPeriodInSeconds),
            Duration.ofSeconds(aggregationWindowSizeInSeconds),
            false);

    myStream.filter((key, val) -> val!=null)
            .selectKey((key, val) -> val.getId())
            .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
            .aggregate(MyDto::new,
                    new MyUpdater(),
                    Materialized.as(storeSupplier)
                            .withCachingDisabled()
                            .withLoggingDisabled()
                            .with(Serdes.String(), new MyDtoSerde()))

1 Ответ

0 голосов
/ 19 июля 2019

как объяснил Билл Бежек здесь , использование статических методов Materialized в 2.3.0 немного сложно.

Я решил проблему следующим образом:

    Materialized<String, MyDto, WindowStore<Bytes, byte[]>> materialized;
    materialized = Materialized.with(Serdes.String(), new MyDtoSerde());
    if (withLoggingDisabled) {
        materialized.withLoggingDisabled();
    }

    myStream.filter((key, val) -> val!=null)
        .selectKey((key, val) -> val.getId())
        .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
        .windowedBy(TimeWindows.of(Duration.ofSeconds(aggregationWindowSizeInSeconds))
                   .grace(Duration.ofSeconds(windowRetentionPeriodInSeconds)))
        .aggregate(MyDto::new,
                   new MyUpdater(),
                   materialized)
...