Потоки Кафки - Json - PullRequest
       39

Потоки Кафки - Json

0 голосов
/ 06 ноября 2019

Я думаю сделать следующее.

  1. В Topic1 есть данные JSON

    Пример: {"name":"alice","userid":82,"emailid":"abc@xx.com"}

  2. Тема 2 содержит данные JSON

    Пример: {"address":"xxxx","userid":82,"phone":"xxxx"}

Учитывая, что ключ не определен, то есть егоnull по обеим темам.

Я хотел бы получить данные и присоединиться к ним на основе идентификатора пользователя - я понимаю код строковых данных.

GlobalKTable<String, String> usersGlobalTable = builder.globalTable("user-table");

// we get a stream of user purchases
KStream<String, String> userPurchases = builder.stream("user-purchases");

// we want to enrich that stream
KStream<String, String> userPurchasesEnrichedJoin =
        userPurchases.join(usersGlobalTable,
                (key, value) -> key, /* map from the (key, value) of this stream to the key of the GlobalKTable */
                (userPurchase1, userInfo1) -> "Purchase=" + userPurchase1 + ",UserInfo=[" + userInfo1 + "]"
        );

userPurchasesEnrichedJoin.to("user-purchases-enriched-inner-join");

Может кто-нибудь помочь с фрагментом кодадля JSON или любого руководства, пожалуйста.

Спасибо за вашу помощь.

...