Kafka Streams - KStream to KTable Inner Join - PullRequest
0 голосов
/ 14 января 2020

Я создал один KStream и один KTable, которые читают сообщения из разных тем. Затем я пытаюсь соединить их с помощью функции .join() (в документации сказано, что это внутреннее соединение) с другим KStream. Что меня смущает, так это то, что вывод показывает, что действительно происходит левое соединение.

Ниже я представляю реализацию KStream & KTable, включая их выходные данные.

KStream:

KStream<String, String> s_order = builder.stream("s-order-topic", Consumed.with(Serdes.String(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
                .selectKey((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return String.valueOf(json.getLong("ID"));
                });

Выход:

23 {"CODE":"AAAA55","STATUS":"SUBMITTED","ID":23}
4 {"CODE":"AAAA18","STATUS":"CANCELED","ID":4}
2 {"CODE":"AAAA32","STATUS":"SUBMITTED","ID":2}
24 {"CODE":"AAAA11","STATUS":"NOT COMPLETED","ID":24}
16 {"CODE":"AAAA93","STATUS":"CANCELED","ID":16}
19 {"CODE":"AAAA65","STATUS":"SUBMITTED","ID":19}
25 {"CODE":"AAAA71","STATUS":"COMPLETED","ID":25}
3 {"CODE":"AAAA91","STATUS":"NOT COMPLETED","ID":3}
17 {"CODE":"AAAA31","STATUS":"SUBMITTED","ID":17}
1 {"CODE":"AAAA89","STATUS":"NOT COMPLETED","ID":1}

KTable:

KTable<String, String> s_item_table = builder.stream("s-order-item-topic", Consumed.with(Serdes.String(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
                .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return String.valueOf(json.getLong("ID"));
                }, Grouped.with(Serdes.String(), Serdes.String()))
                .reduce((prev,newval)->newval);

Выход:

17 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100519","ID":17}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100575","ID":9}
24 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100850","ID":24}
27 {"DESCRIPTION":"blah blah blah","QUANTITY":9,"ID_CUSTOMER_ORDER":"GR0100953","ID":27}
25 {"DESCRIPTION":"blah blah blah","QUANTITY":5,"ID_CUSTOMER_ORDER":"GR0100577","ID":25}
22 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100639","ID":22}
24 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100974","ID":24}
19 {"DESCRIPTION":"blah blah blah","QUANTITY":9,"ID_CUSTOMER_ORDER":"GR0100674","ID":19}
15 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100151","ID":15}
2 {"DESCRIPTION":"blah blah blah","QUANTITY":4,"ID_CUSTOMER_ORDER":"GR0100326","ID":2}
1 {"DESCRIPTION":"blah blah blah","QUANTITY":5,"ID_CUSTOMER_ORDER":"GR0100913","ID":1}
17 {"DESCRIPTION":"blah blah blah","QUANTITY":9,"ID_CUSTOMER_ORDER":"GR0100448","ID":17}
29 {"DESCRIPTION":"blah blah blah","QUANTITY":2,"ID_CUSTOMER_ORDER":"GR0100238","ID":29}
5 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100687","ID":5}
4 {"DESCRIPTION":"blah blah blah","QUANTITY":8,"ID_CUSTOMER_ORDER":"GR0100580","ID":4}
6 {"DESCRIPTION":"blah blah blah","QUANTITY":8,"ID_CUSTOMER_ORDER":"GR0100632","ID":6}
17 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100365","ID":17}
7 {"DESCRIPTION":"blah blah blah","QUANTITY":8,"ID_CUSTOMER_ORDER":"GR0100974","ID":7}
2 {"DESCRIPTION":"blah blah blah","QUANTITY":8,"ID_CUSTOMER_ORDER":"GR010084","ID":2}

Объединенный поток:

KStream<String, String> s_joined = s_order
                .join(s_item_table, (left,right) -> left + right)
                .mapValues(value -> {
                    String[] arrOfstr = value.split("(?<=})");
                    JSONObject jl = new JSONObject(arrOfstr[0]);
                    JSONObject jr = new JSONObject(arrOfstr[1]);
                    JSONObject json = new JSONObject();
                    Iterator<String> keys = jl.keys();
                    while(keys.hasNext()) {
                        String key = keys.next();
                        json.put(key, jl.get(key));
                    }
                    keys = jr.keys();
                    while(keys.hasNext()) {
                        String key = keys.next();
                        json.put(key, jr.get(key));
                    }
                    return json.toString();
                });

Выход:

23 {"CODE":"AAAA82","STATUS":"SUBMITTED","ID":23}
7 {"CODE":"AAAA74","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100832","ID":7}
11 {"CODE":"AAAA55","STATUS":"PENDING","DESCRIPTION":"blah blah blah","QUANTITY":8,"ID_CUSTOMER_ORDER":"GR0100206","ID":11}
23 {"CODE":"AAAA82","STATUS":"SUBMITTED","ID":23}
4 {"CODE":"AAAA18","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100182","ID":4}
2 {"CODE":"AAAA32","STATUS":"SUBMITTED","DESCRIPTION":"blah blah blah","QUANTITY":4,"ID_CUSTOMER_ORDER":"GR0100326","ID":2}
24 {"CODE":"AAAA11","STATUS":"NOT COMPLETED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100974","ID":24}
16 {"CODE":"AAAA87","STATUS":"SUBMITTED","ID":16}
19 {"CODE":"AAAA65","STATUS":"SUBMITTED","DESCRIPTION":"blah blah blah","QUANTITY":9,"ID_CUSTOMER_ORDER":"GR0100674","ID":19}
25 {"CODE":"AAAA71","STATUS":"COMPLETED","DESCRIPTION":"blah blah blah","QUANTITY":5,"ID_CUSTOMER_ORDER":"GR0100577","ID":25}
3 {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":3}
17 {"CODE":"AAAA31","STATUS":"SUBMITTED","DESCRIPTION":"blah blah blah","QUANTITY":9,"ID_CUSTOMER_ORDER":"GR0100448","ID":17}
1 {"CODE":"AAAA89","STATUS":"NOT COMPLETED","DESCRIPTION":"blah blah blah","QUANTITY":5,"ID_CUSTOMER_ORDER":"GR0100913","ID":1}

Как вы можете видеть короткие сообщения были опубликованы, хотя в KTable не было никакой ценности, использовались только сообщения из Stream. Это не внутреннее присоединение. Я что-то упустил?

...