Apache Объединение потоков Kafka: java .lang.ClassCastException: at org. apache .kafka.streams.state.StateSerdes.valueFrom (StateSerdes. java: 160) - PullRequest
1 голос
/ 15 января 2020

Я пытаюсь выполнить объединение потоков Кафки. У меня есть два потока клиентов и порядок. Ниже приведен фрагмент кода:

    final String customerTopic = Constants.CUSTOMER_TOPIC;
    final String orderTopic = Constants.ORDER_TOPIC;

    Serde<Customer> customerSerde = Serdes.serdeFrom(new CustomerSerializer(), new CustomerDeserializer());
    Serde<Order> orderSerde = Serdes.serdeFrom(new OrderSerializer(), new OrderDeserializer());
    Serde<Long> longSerde = Serdes.Long();

    KStream<Long, Customer> customerStream = builder.stream(customerTopic, Consumed.with(longSerde, customerSerde));
    KStream<Long, Order> orderStream = builder.stream(orderTopic, Consumed.with(longSerde, orderSerde))
            .selectKey((k, v) -> v.getCustomerId());

    KStream<Long, CustomerOrder> joinedStream = orderStream.join(customerStream,
            (order, customer) -> new CustomerOrder(customer, order), JoinWindows.of(Duration.ofSeconds(120)),
            Joined.with(longSerde, orderSerde, customerSerde));

Во время работы приложения я получаю следующее исключение:

java.lang.ClassCastException: poc.kafka.domain.Order cannot be cast to poc.kafka.domain.Customer
    at poc.kafka.domain.serialization.CustomerDeserializer.deserialize(CustomerDeserializer.java:12)
    at poc.kafka.domain.serialization.CustomerDeserializer.deserialize(CustomerDeserializer.java:1)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:56)
    at org.apache.kafka.streams.state.internals.MeteredWindowStoreIterator.next(MeteredWindowStoreIterator.java:26)
    at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:97)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
...