Вы можете использовать flatMap()
или flatMapValues()
методы.Эти методы берут одну запись и дают ноль, одну или несколько записей.
flatMap()
может изменять ключ, значения и их типы данных, в то время как flatMapValues()
сохраняет исходные ключи и изменяет значение и тип данных значения.
Вот пример псевдокода с учетом того, что новые сообщения «C», «D», «E» будут иметь новый ключ.
KStream<byte[], String> inputStream = builder.stream("inputTopic");
KStream<byte[], String> outStream = inputStream.flatMap(
(key,value)->{
List<KeyValue<byte[], String>> result = new LinkedList<>();
// If message value is "B". Otherwise place your condition based on data
if(value.equalsTo("B")){
result.add(KeyValue.pair("<new key for message C>","C"));
result.add(KeyValue.pair("<new key for message D>","D"));
result.add(KeyValue.pair("<new key for message E>","E"));
}else{
result.add(KeyValue.pair(key,value));
}
return result;
});
outStream.to("sinkTopic");
Подробнее об этом можно прочитать: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-transformations-stateless