Apache Kafka - KStream with KStream Присоединиться к последним сообщениям - PullRequest
0 голосов
/ 15 января 2020

Я создал для KStreams, что хочу присоединиться к ним вместе. Выходные данные двух потоков следующие:

Поток 1:

2    {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}

Поток 2:

26   {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}

Я хочу создать объединенный поток (внутреннее объединение) этих двух потоков, поэтому я создал следующий KStream:

KStream<String, String> s_joined = s_order
        .join(s_order_item, (left,right) -> left + right,
                JoinWindows.of(Duration.ofSeconds(30)))
        .mapValues(value -> {
            String[] arrOfstr = value.split("(?<=})");
            JSONObject jl = new JSONObject(arrOfstr[0]);
            JSONObject jr = new JSONObject(arrOfstr[1]);
            JSONObject json = new JSONObject();
            Iterator<String> keys = jl.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jl.get(key));
            }
            keys = jr.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jr.get(key));
            }
            return json.toString();
        });

В этом KStream я просто использую соединение и меняю формат выходное сообщение, ничего более.

На одном примере я объясню, что я хочу сделать:

В окне публикуются следующие сообщения:

Stream 1

9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}

Поток 2

9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

Объединенный поток

Что опубликовано

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

То, что я хочу опубликовать

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

В заключение я хочу опубликовать sh только последнее сообщение в окне , не все они. Это возможно?

Ответы [ 2 ]

0 голосов
/ 05 февраля 2020

Я нашел ответ. Чтобы добиться того, что я хочу сделать, используйте функцию suppress. Более подробно, вы groupByKey() KStream и затем используете функцию Window. Наконец, агрегируйте сгруппированные данные и используйте suppress.

s_joined.toStream()
        .groupByKey()
        .WindowedBy(...)
        .aggregate(...)
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

0 голосов
/ 15 января 2020

Вы можете использовать функцию groupByKey, которая возвращает KGroupedStream, а затем преобразовать ее требуемым образом с помощью map/reduce функций для этого. Пожалуйста, смотрите Kafka Streams DSL для получения дополнительной информации.

...