как использовать globalKtable и StateStore на одну тему? - PullRequest
0 голосов
/ 04 ноября 2019

Просто чтобы уточнить, я новичок в Kafka, извините, если мои вопросы кажутся недокументированными, я читаю учебные пособия, документы и все, что могу, чтобы это понять.

Я пытаюсь прочитать все значения изGlobalStore для обновления его значений, а затем используйте уже существующий StateStore, чтобы поместить эти новые обновленные значения.

Я пытаюсь сделать это, потому что когда я делаю:

this.stateStore.all();

У меня только 1/10 данных, если я правильно понял, это потому, что у меня 10 разделов, а ss читает только один (хотя я не совсем понимаю, почему)

Это моя глобальная таблица:

    public StreamsBuilder declareTopology(StreamsBuilder builder) {

        logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
                getInputTopic(),
                getDataTopic(),
                getToEsTopic());

        builder.globalTable(
                getDataTopic(),
                Consumed.with(Serdes.String(), fooSerdes)
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
                Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
                        "foosktable")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(fooSerdes)
                        .withLoggingEnabled(new HashMap<>()));
    ...

И это addStateStore, который я не могу удалить, потому что он используется в другом месте кода:

       ...

       builder.addStateStore(
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("foosktable"),
                    Serdes.String(),
                    fooSerdes));
    ...

    return builder;
}

Итак, теоретически, я думал об удалении, чтобы удалитьStateStore, который также использует ту же тему, и помещает мои данные, используя одну из моих тем data.process, проблема в том, что этот процессор делает другие вещи с этим StateStore, поэтому я не могу уничтожить его.

Я потерялся здесь, любой свет очень помог бы. Спасибо!

1 Ответ

2 голосов
/ 05 ноября 2019

Немного неясно, чего вы на самом деле пытаетесь достичь. Тем не менее, несколько объяснений высокого уровня:

A GlobalKTable имеет только одну цель: читать данные без изменений из темы, чтобы разрешить либо KStream-GlobalKTable -соединение, либо запросить хранилище через "интерактив"запросы ".

Следовательно, вы не можете действительно делать то, что хотите, поскольку копирование данных из глобального хранилища в другое хранилище невозможно в соответствии с вашими намерениями. Вам нужно будет продублировать тему ввода и прочитать ее дважды: (1) как GlobalKTable и (2) как обычный KStream, чтобы изменить данные перед тем, как поместить их в хранилище. Для (2) вы можете использовать transform().

Надеюсь, это поможет.

...