Я новичок в API Kafka Streams и пытаюсь создать KTable. У меня есть ввод topi c: s-order-topic
, который представляет собой сообщение в формате json, как показано ниже.
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
Я читаю сообщения из этой топи c и хочу создать KTable , которая имеет ключ , поле "after":"ID"
и для значение все поля внутри поля "after"
(кроме "ID"
).
Я успешно создал таблицу KTable только тогда, когда использую агрегатные функции по умолчанию, то есть count. Но у меня возникают трудности при создании собственной агрегатной функции. Ниже я представляю ту часть кода, в которой я пытаюсь создать KTable.
KTable<Long, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.Long(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getLong("ID");
}, Grouped.with(Serdes.Long(), Serdes.String()))
.aggregate( ... );
Как мне реализовать этот KTable?
Правильно ли я подхожу к проблеме?
(mapValues -> оставить только поле «до» / «после». GroupBy -> сделать идентификатор ключом сообщения. Aggregate ->?)