Присоединение к потокам кафки, содержащим объекты - PullRequest
0 голосов
/ 20 июня 2019

Опыт моего первого объединения потоков Кафка, я пытаюсь просто обернуть два объекта в новый объект.Но я все еще получаю ошибку, которую я не понимаю:

@StreamListener
@SendTo("t3_joined_out")
public KStream<String, MyValueContainer> process(
        @Input("t2_cashflow_stream_in") KStream<String, Cashflow> cashflowStream,
        @Input("t2_contract_stream_in") KTable<String, Contract> contractTable) {

    return  cashflowStream
            .leftJoin(contractTable, (cashflow, contract) -> new MyValueContainer(cashflow, contract),              
                    Joined.with(Serdes.String(), new JsonSerde(Cashflow.class), new JsonSerde(Contract.class)))
            ;
}

Кажется, что я использовал неправильный Serde для объекта "Контракт", но я не понимаю его.Кроме того, я никогда не получаю трассировку стека, которая отслеживает мои собственные строки кода.Есть что делать?

Ошибка

2019-06-20 22:28:10,287  INFO kafka-producer-network-thread | producer-5 o.a.k.c.Metadata:285 - Cluster ID: 95g5Kjf7RoCKudHla5l7fA
2019-06-20 22:28:10,377 ERROR stream-table-sample-0178341b-1c0d-4f5a-b058-4d679303c87d-StreamThread-1 o.a.k.s.p.i.AssignedStreamsTasks:107 - stream-thread [stream-table-sample-0178341b-1c0d-4f5a-b058-4d679303c87d-StreamThread-1] Failed to process stream task 0_0 due to the following error:
java.lang.ClassCastException: [B cannot be cast to tki.bigdata.pojo.Contract
    at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:73)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:33)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:48)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

1 Ответ

0 голосов
/ 25 июня 2019

Вот небольшой образец , извлеченный из предоставленного вами хранилища. Я проверил, что соединение KStream-KTable работает. Пожалуйста, посмотрите на конфигурацию и посмотрите, сможете ли вы внести аналогичные изменения в ваше приложение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...