Apache Кафка - Внедрение KTable - PullRequest
0 голосов
/ 07 января 2020

Я новичок в API Kafka Streams и пытаюсь создать KTable. У меня есть ввод topi c: s-order-topic, который представляет собой сообщение в формате json, как показано ниже.

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

Я читаю сообщения из этой топи c и хочу создать KTable , которая имеет ключ , поле "after":"ID" и для значение все поля внутри поля "after" (кроме "ID").

Я успешно создал таблицу KTable только тогда, когда использую агрегатные функции по умолчанию, то есть count. Но у меня возникают трудности при создании собственной агрегатной функции. Ниже я представляю ту часть кода, в которой я пытаюсь создать KTable.

KTable<Long, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.Long(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
               .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getLong("ID");
                }, Grouped.with(Serdes.Long(), Serdes.String()))
                .aggregate( ... );

Как мне реализовать этот KTable?

Правильно ли я подхожу к проблеме?

(mapValues ​​-> оставить только поле «до» / «после». GroupBy -> сделать идентификатор ключом сообщения. Aggregate ->?)

1 Ответ

1 голос
/ 08 января 2020

Я нашел решение для моего случая. Я реализовал таблицу KTable, как показано ниже:

 KTable<String, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.String(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
                .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return String.valueOf(json.getLong("ID"));
                }, Grouped.with(Serdes.String(), Serdes.String()))
                .reduce((prev,newval)->newval);

Функция aggregate не подходит для этого случая, вместо этого я использовал функцию reduce.

Вывод от потребителя консоли показано ниже:

15   {"CODE":"AAAA17","STATUS":"PENDING","ID":15}
18   {"CODE":"AAAA50","STATUS":"SUBMITTED","ID":18}
4    {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":4}
19   {"CODE":"AAAA83","STATUS":"SUBMITTED","ID":19}
18   {"CODE":"AAAA33","STATUS":"COMPLETED","ID":18}
5    {"CODE":"AAAA38","STATUS":"PENDING","ID":5}
10   {"CODE":"AAAA1","STATUS":"COMPLETED","ID":10}
3    {"CODE":"AAAA68","STATUS":"NOT COMPLETED","ID":3}
9    {"CODE":"AAAA89","STATUS":"PENDING","ID":9}
...