Kafka Stream создает собственный список сообщений на определенных условиях - PullRequest
0 голосов
/ 05 июля 2019

У нас есть следующие требования к обработке потока.

Source Stream -> 
 transform(condition check - If (true) then generate MULTIPLE ADDITIONAL messages else just transform the incoming message) ->
 output kafka topic

Example:
If condition is true for message B(D,E,F are the additional messages produced)
A,B,C -> A,D,E,F,C -> Sink Kafka Topic
If condition is false     
A,B,C -> A,B,C -> Sink Kafka Topic

Есть ли способ достичь этого в потоках Кафки?

1 Ответ

2 голосов
/ 05 июля 2019

Вы можете использовать flatMap() или flatMapValues() методы.Эти методы берут одну запись и дают ноль, одну или несколько записей.

flatMap() может изменять ключ, значения и их типы данных, в то время как flatMapValues() сохраняет исходные ключи и изменяет значение и тип данных значения.

Вот пример псевдокода с учетом того, что новые сообщения «C», «D», «E» будут иметь новый ключ.

KStream<byte[], String> inputStream = builder.stream("inputTopic");
KStream<byte[], String> outStream = inputStream.flatMap( 
           (key,value)->{
            List<KeyValue<byte[], String>> result = new LinkedList<>();  
                // If message value is "B". Otherwise place your condition based on data     
                if(value.equalsTo("B")){ 
                      result.add(KeyValue.pair("<new key for message C>","C"));
                      result.add(KeyValue.pair("<new key for message D>","D"));
                      result.add(KeyValue.pair("<new key for message E>","E"));

                 }else{
                         result.add(KeyValue.pair(key,value));
                 }
            return result;
});
outStream.to("sinkTopic");

Подробнее об этом можно прочитать: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-transformations-stateless

...