Как загрузить GlobalKTable из темы без ключа? - PullRequest
0 голосов
/ 25 ноября 2018

Когда мне нужно загрузить тему без ключа как kTable, я обычно использую тему как kStream, переназначаю ее для извлечения ключа / значения и использую kGroupedStream для построенияkTable и его хранилище:

KStream<String, String> mappingStream = builder
      .stream(TOPIC_MAPPING_IN, consumed)
      .map(
          (key, value) -> KeyValue.pair(
              value.get("my_key").asText(), 
              value.get("my_value").asText())
       );

KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(Serialized.with(Serdes.String(),Serdes.String() ));

KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
      () -> "", //initializer 
      (aggKey, newValue, aggValue) -> newValue,  
      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(MY_MAPPINGS_STORE)
          .withValueSerde(Serdes.String())
          .withKeySerde(Serdes.String())
       );

Теперь мы начинаем разделять наши темы, и я пытаюсь воспользоваться GlobalKTable, но я нашел единственный способ подделать GlobalKTableэто builder.globalTable(...).

Я думаю, это будет работать, если мы напишем приложение, которое загружает данные из topic_in_WITHOUT_key, подделывает ключ и публикует в topic_in_WITH_keyGlobalKTable загружается из этого topic_in_WITH_key.

Это единственный способ?

Моя главная задача в этом решении - попросить кого-нибудь создать / настроить промежуточную тему, свозможные ошибки.В случае, который я представил во введении к kTable, тема changelog действовала как промежуточная тема, но эта changelog-topic автоматически создается приложением с соответствующей конфигурацией.

...