Как объединить 2 потока в кафке? - PullRequest
0 голосов
/ 28 мая 2018

Изучение потоков Кафки, попытки объединить два потока (значение Json) в 5-минутном окне.Насколько я понимаю, у меня должен быть одинаковый ключ для значений, соответствующих критериям соединенияЕсли мое понимание правильное, то как ключи только присоединяются, верно?Если да, то как мне присоединить значения JSON.Т.е. Stream1: Key=a, value={a,b,c}. Stream2: Key=a, value={x} and key=a, value={y}. Expected o/p: {a,b,c,x} and {a,b,c,y}.

Чтобы добиться этого, как должен выглядеть мой ValueJoiner.Помоги мне с этим.Мой пример кода:

KStream<String, JsonNode> resultStream = stream1.leftJoin(stream2,
                new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
                    @Override
                    public JsonNode apply(JsonNode value1, JsonNode value2) {
                        if (value1 != null && value2 != null) {


                            return value1;
                        }
                        return null;
                    }
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(20)), Joined.with(Serdes.String(), /* key */
                        jsonSerde, /* left value */
                        jsonSerde) /* right value */
        );

1 Ответ

0 голосов
/ 28 мая 2018

Ваше понимание правильности работы объединений (при условии, что метка времени записи отличается от размера окна объединения).

Для управления JsonNodes просто выполните поиск в Интернете: Как изменитьJsonNode в Java?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...