Просто чтобы уточнить, я новичок в Kafka, извините, если мои вопросы кажутся недокументированными, я читаю учебные пособия, документы и все, что могу, чтобы это понять.
Я пытаюсь прочитать все значения изGlobalStore для обновления его значений, а затем используйте уже существующий StateStore, чтобы поместить эти новые обновленные значения.
Я пытаюсь сделать это, потому что когда я делаю:
this.stateStore.all();
У меня только 1/10 данных, если я правильно понял, это потому, что у меня 10 разделов, а ss читает только один (хотя я не совсем понимаю, почему)
Это моя глобальная таблица:
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
getInputTopic(),
getDataTopic(),
getToEsTopic());
builder.globalTable(
getDataTopic(),
Consumed.with(Serdes.String(), fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
И это addStateStore, который я не могу удалить, потому что он используется в другом месте кода:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),
Serdes.String(),
fooSerdes));
...
return builder;
}
Итак, теоретически, я думал об удалении, чтобы удалитьStateStore, который также использует ту же тему, и помещает мои данные, используя одну из моих тем data.process, проблема в том, что этот процессор делает другие вещи с этим StateStore, поэтому я не могу уничтожить его.
Я потерялся здесь, любой свет очень помог бы. Спасибо!