Когда мне нужно загрузить тему без ключа как kTable
, я обычно использую тему как kStream
, переназначаю ее для извлечения ключа / значения и использую kGroupedStream
для построенияkTable
и его хранилище:
KStream<String, String> mappingStream = builder
.stream(TOPIC_MAPPING_IN, consumed)
.map(
(key, value) -> KeyValue.pair(
value.get("my_key").asText(),
value.get("my_value").asText())
);
KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(Serialized.with(Serdes.String(),Serdes.String() ));
KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
() -> "", //initializer
(aggKey, newValue, aggValue) -> newValue,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(MY_MAPPINGS_STORE)
.withValueSerde(Serdes.String())
.withKeySerde(Serdes.String())
);
Теперь мы начинаем разделять наши темы, и я пытаюсь воспользоваться GlobalKTable
, но я нашел единственный способ подделать GlobalKTable
это builder.globalTable(...)
.
Я думаю, это будет работать, если мы напишем приложение, которое загружает данные из topic_in_WITHOUT_key
, подделывает ключ и публикует в topic_in_WITH_key
.И GlobalKTable
загружается из этого topic_in_WITH_key
.
Это единственный способ?
Моя главная задача в этом решении - попросить кого-нибудь создать / настроить промежуточную тему, свозможные ошибки.В случае, который я представил во введении к kTable
, тема changelog
действовала как промежуточная тема, но эта changelog-topic
автоматически создается приложением с соответствующей конфигурацией.