Я сталкиваюсь с проблемой присоединения KStream к GlobalKTable и буду признателен за вашу помощь.
Учитывая две темы Кафки orders
и customers
:
Заказы
"1" {"ID":"1","Name":"Myorder1","CustID":"100"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200"}
Клиенты
"100" {"CustID":"100","CustName":"Customer1"}
"200" {"CustID":"200","CustName":"Customer2"}
Требуется обогатить поток заказов именем клиента
"1" {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}
Я пытаюсь сделать следующее:
- Сборка KStream из темы
orders
- Создание GlobalKTable из темы
customers
- Создайте еще один поток, объединяющий Заказы и Клиентов (посмотрите Order.CustID в таблице Customer)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
customers,
new KeyValueMapper<String, Order, String>() {
@Override
public String apply(String key, Order value) {
return value.CustID;
}
},
new ValueJoiner<Order,Customer, EnrichedOrder>() {
@Override
public EnrichedOrder apply(Order order, Customer customer) {
EnrichedOrder eorder = new EnrichedOrder();
eorder.CustID = order.CustID;
eorder.CustName = customer.CustName;
eorder.ID = order.ID;
eorder.Name = order.Name;
return eorder;
}
}
);
Но это не дает никакого результата и не выдает никаких исключений.
При использовании leftJoin
я получаю исключение NullPointer для Customer.
Пожалуйста, дайте мне знать, если вы столкнулись с подобной проблемой, и предложите, как это исправить.