Примечание: это неверный вопрос. Пожалуйста, игнорируйте.
У меня KSA прослушивает две темы (две под-топологии), одна (под-топология A) записывает в хранилище состояний, а другая (под-топология B) читает из хранилища состояний ,
Запись
...
stream
.mapValues(v -> new Version(v.getHeader().getOccurredAt().getSeconds(), v.getVersion().getValue()))
.groupByKey()
.aggregate(
() -> new Version(0,0),
(aggKey, newValue, aggValue) -> aggValue.getTimestamp() > newValue.getTimestamp() ? aggValue : newValue,
Materialized.<String, AgentVersion, KeyValueStore<Bytes, byte[]>>as(conf.versionStoreName())
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(
new Version.Serializer(), new Version.Deserializer())));
Чтение
ReadOnlyKeyValueStore<String, Version> getVersionStore() {
return app().store(conf.versionStoreName(), QueryableStoreTypes.keyValueStore());
}
Однако я обнаружил, что B не может получить данные, записанные A (A может правильно получить дату) .
Я что-то пропустил?