Как выбрать нулевые значения в данных формата String после присоединения к Kstream - PullRequest
0 голосов
/ 04 июня 2018

Я выполнил операции объединения двух kstream, которые состоят из данных формата avro, а затем мой ключ имеет тип Integer, а значения имеют тип string.Выходные данные выглядят так:

[KSTREAM-MERGE-0000000016]: 1, {"id": 1, "name": "john", "age": 26}/{"id":1, "name": "d", "age": 67}
[KSTREAM-MERGE-0000000016]: 2, {"id": 2, "name": "jane", "age": 24}/{"id": 2, "name": "e", "age": 78}
[KSTREAM-MERGE-0000000016]: 3, {"id": 3, "name": "julia", "age": 25}/{"id": 3, "name": "h", "age": 12}
[KSTREAM-MERGE-0000000016]: 4, {"id": 4, "name": "jamie", "age": 22}/null
[KSTREAM-MERGE-0000000016]: 5, {"id": 5, "name": "jenny", "age": 27}/null
[KSTREAM-MERGE-0000000016]: 6, {"id": 6, "name": "kishore", "age": 27}/null
[KSTREAM-MERGE-0000000016]: 7, {"id": 7, "name": "purna", "age": 27}/null
[KSTREAM-MERGE-0000000016]: 8, {"id": 8, "name": "xxx", "age": 10}/null
[KSTREAM-MERGE-0000000016]: 9, {"id": 9, "name": "yyy", "age": 10}/null
[KSTREAM-MERGE-0000000016]: 10, {"id": 10, "name": "zzz", "age": 10}/null

Теперь я хочу отфильтровать значения, где в конце есть ноль.мой ожидаемый результат должен быть как:

[KSTREAM-MERGE-0000000016]: 4, {"id": 4, "name": "jamie", "age": 22}/null
[KSTREAM-MERGE-0000000016]: 5, {"id": 5, "name": "jenny", "age": 27}/null
[KSTREAM-MERGE-0000000016]: 6, {"id": 6, "name": "kishore", "age": 27}/null
[KSTREAM-MERGE-0000000016]: 7, {"id": 7, "name": "purna", "age": 27}/null
[KSTREAM-MERGE-0000000016]: 8, {"id": 8, "name": "xxx", "age": 10}/null
[KSTREAM-MERGE-0000000016]: 9, {"id": 9, "name": "yyy", "age": 10}/null
[KSTREAM-MERGE-0000000016]: 10, {"id": 10, "name": "zzz", "age": 10}/null

1 Ответ

0 голосов
/ 04 июня 2018

прежде всего удалите нулевое значение

    KStream<Long, String> stream = streamsBuilder
        .stream("TOPIC1", Consumed.with(Long(), Serdes.String()))
        .filter((k, v) -> v != null);

    KStream<Long, String> stream2 = streamsBuilder
        .stream("TOPIC2", Consumed.with(Long(), Serdes.String()))
        .filter((k, v) -> v != null);

    stream.join(stream2, (value1, value2) -> value1 + "/" + value2, JoinWindows.of(1000));
...