У меня есть поток Kafka, который получает записи, и я хочу объединить сообщения на основе определенного поля.
Сообщение в потоке выглядит следующим образом:
Key: 2099
Payload{
email: tom@emample.com
eventCode: 2099
}
Ожидаемый результат:
key: 2099
Payload{
emails: tom@example, bill@acme.com, jane@example.com
}
Я могу заставить поток работать нормально, я просто не уверен, что должна содержать лямда.
Это то, что я сделал до сих пор.Я не уверен, следует ли мне использовать карту, объединить или уменьшить или сочетание этих операций.
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);
inputStream
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))
// Not sure what to do here …..
}).to (OUTPUT_TOPIC );