Я создал для KStreams, что хочу присоединиться к ним вместе. Выходные данные двух потоков следующие:
Поток 1:
2 {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}
Поток 2:
26 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}
Я хочу создать объединенный поток (внутреннее объединение) этих двух потоков, поэтому я создал следующий KStream:
KStream<String, String> s_joined = s_order
.join(s_order_item, (left,right) -> left + right,
JoinWindows.of(Duration.ofSeconds(30)))
.mapValues(value -> {
String[] arrOfstr = value.split("(?<=})");
JSONObject jl = new JSONObject(arrOfstr[0]);
JSONObject jr = new JSONObject(arrOfstr[1]);
JSONObject json = new JSONObject();
Iterator<String> keys = jl.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jl.get(key));
}
keys = jr.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jr.get(key));
}
return json.toString();
});
В этом KStream я просто использую соединение и меняю формат выходное сообщение, ничего более.
На одном примере я объясню, что я хочу сделать:
В окне публикуются следующие сообщения:
Stream 1
9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}
Поток 2
9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
Объединенный поток
Что опубликовано
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
То, что я хочу опубликовать
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
В заключение я хочу опубликовать sh только последнее сообщение в окне , не все они. Это возможно?