Как объединить два HashMapиспользуя методы KStream? - PullRequest
0 голосов
/ 13 июня 2019

Я получаю сообщения от Кафки.Я получаю два сообщения с одинаковым идентификатором и пытаюсь объединить их в одно.

Это работает, но я получил только одно сообщение

        KStream<String, byte[]> stream = streamsBuilder.stream(topic);

        stream.selectKey((k, v) -> {
            Map<String, String> headers = this.getHeaders(v);
            return ParserService.getFieldValue(headers, "api_message_id");
        })
                .groupByKey()
                .aggregate(() -> new byte[]{}, (aggKey, newValue, aggValue) -> {
                    byte[] c = new byte[aggValue.length + newValue.length];
                    System.arraycopy(aggValue, 0, c, 0, aggValue.length);
                    System.arraycopy(newValue, 0, c, aggValue.length, newValue.length);
                    return c;
                })
                .toStream()
                .foreach((k, v) -> {
                    Map<String, String> headers = this.getHeaders(v);
                    parserService.processGroupedMessage(getHeaders(v));
                });

...