Я пытаюсь выполнить объединение потоков Кафки. У меня есть два потока клиентов и порядок. Ниже приведен фрагмент кода:
final String customerTopic = Constants.CUSTOMER_TOPIC;
final String orderTopic = Constants.ORDER_TOPIC;
Serde<Customer> customerSerde = Serdes.serdeFrom(new CustomerSerializer(), new CustomerDeserializer());
Serde<Order> orderSerde = Serdes.serdeFrom(new OrderSerializer(), new OrderDeserializer());
Serde<Long> longSerde = Serdes.Long();
KStream<Long, Customer> customerStream = builder.stream(customerTopic, Consumed.with(longSerde, customerSerde));
KStream<Long, Order> orderStream = builder.stream(orderTopic, Consumed.with(longSerde, orderSerde))
.selectKey((k, v) -> v.getCustomerId());
KStream<Long, CustomerOrder> joinedStream = orderStream.join(customerStream,
(order, customer) -> new CustomerOrder(customer, order), JoinWindows.of(Duration.ofSeconds(120)),
Joined.with(longSerde, orderSerde, customerSerde));
Во время работы приложения я получаю следующее исключение:
java.lang.ClassCastException: poc.kafka.domain.Order cannot be cast to poc.kafka.domain.Customer
at poc.kafka.domain.serialization.CustomerDeserializer.deserialize(CustomerDeserializer.java:12)
at poc.kafka.domain.serialization.CustomerDeserializer.deserialize(CustomerDeserializer.java:1)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:97)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)