Kafka Stream и KGlobalTable Присоединяются к выпуску - PullRequest
1 голос
/ 04 июня 2019

Я сталкиваюсь с проблемой присоединения KStream к GlobalKTable и буду признателен за вашу помощь.

Учитывая две темы Кафки orders и customers:

Заказы

"1"     {"ID":"1","Name":"Myorder1","CustID":"100"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200"}

Клиенты

"100"   {"CustID":"100","CustName":"Customer1"}

"200"   {"CustID":"200","CustName":"Customer2"}

Требуется обогатить поток заказов именем клиента

"1"     {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}

Я пытаюсь сделать следующее:

  1. Сборка KStream из темы orders
  2. Создание GlobalKTable из темы customers
  3. Создайте еще один поток, объединяющий Заказы и Клиентов (посмотрите Order.CustID в таблице Customer)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
    customers,
    new KeyValueMapper<String, Order, String>() {            
        @Override
        public String apply(String key, Order value) {
           return value.CustID;
        }
    },
    new ValueJoiner<Order,Customer, EnrichedOrder>() {
        @Override
        public EnrichedOrder apply(Order order, Customer customer) {
            EnrichedOrder eorder = new EnrichedOrder();
            eorder.CustID = order.CustID;
            eorder.CustName = customer.CustName;
            eorder.ID = order.ID;
            eorder.Name = order.Name;           
            return eorder;
        }
    }
);

Но это не дает никакого результата и не выдает никаких исключений.

При использовании leftJoin я получаю исключение NullPointer для Customer.

Пожалуйста, дайте мне знать, если вы столкнулись с подобной проблемой, и предложите, как это исправить.

Ответы [ 2 ]

1 голос
/ 07 июня 2019

Давайте внимательно посмотрим на содержимое вашей копии-пасты:

В теме customers:

"100"   {"CustID":"100","CustName":"Customer1"}

Вы можете заметить, что ключ является строкой, а эта строка содержит двойные кавычки : "100".Обычно строковые ключи печатаются без двойных кавычек.Я бы предпочел увидеть:

 100    {"CustID":"100","CustName":"Customer1"}

Другими словами, представление вашего ключа в Java String равно ""100"" (или "\"100\""), а не "100", как мы ожидаем.

С другой стороны, значение в вашей теме orders - это Json {"ID":"1","Name":"Myorder1","CustID":"100"}, а атрибут CustID - это строка, на этот раз представленная в Java "100".

Когдавы присоединяетесь к orders и customers, пытаетесь сопоставить заказы CustID 100 с ключом клиента "100".И это не удастся из-за двойных кавычек в ключе, которые отсутствуют в CustID.

0 голосов
/ 06 июня 2019

@ deepak вам может понадобиться материализовать вашу таблицу KTable

builder.table(customers, Materialized.as(customerStore));

Затем направить заказы и построить ваше объединение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...