Kafka Streams группировка и конкатенация - PullRequest
1 голос
/ 03 апреля 2019

У меня есть поток Kafka, который получает записи, и я хочу объединить сообщения на основе определенного поля.

Сообщение в потоке выглядит следующим образом:

Key: 2099
Payload{
  email: tom@emample.com
  eventCode: 2099
}

Ожидаемый результат:

key: 2099
Payload{
    emails: tom@example, bill@acme.com, jane@example.com
}

Я могу заставить поток работать нормально, я просто не уверен, что должна содержать лямда.

Это то, что я сделал до сих пор.Я не уверен, следует ли мне использовать карту, объединить или уменьшить или сочетание этих операций.

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);

inputStream
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))

                                  // Not sure what to do here …..

}).to (OUTPUT_TOPIC );

1 Ответ

4 голосов
/ 03 апреля 2019

Это может быть что-то вроде этого

inputStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))
.aggregate(PayloadAggr::new, new Aggregator<String, Payload, PayloadAggr>() {
        @Override
        public PayloadAggr apply(String key, Payload newValue, PayloadAggr result) {
            result.setKey(key);
            if(result.getEmails()==null){
                result.setEmails(newValue.getEmail());
            }else{
                result.setEmails(result.getEmails() + "," + newValue.getEmail());
            }
            return result;
        }
    }, .../* You serdes and store */}).toStream().to(OUTPUT_TOPIC);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...