Я получаю сообщения от Кафки.Я получаю два сообщения с одинаковым идентификатором и пытаюсь объединить их в одно.
Это работает, но я получил только одно сообщение
KStream<String, byte[]> stream = streamsBuilder.stream(topic);
stream.selectKey((k, v) -> {
Map<String, String> headers = this.getHeaders(v);
return ParserService.getFieldValue(headers, "api_message_id");
})
.groupByKey()
.aggregate(() -> new byte[]{}, (aggKey, newValue, aggValue) -> {
byte[] c = new byte[aggValue.length + newValue.length];
System.arraycopy(aggValue, 0, c, 0, aggValue.length);
System.arraycopy(newValue, 0, c, aggValue.length, newValue.length);
return c;
})
.toStream()
.foreach((k, v) -> {
Map<String, String> headers = this.getHeaders(v);
parserService.processGroupedMessage(getHeaders(v));
});