ClassCastException в spring-kafka-test с использованием `merger ()` - PullRequest
1 голос
/ 21 апреля 2020

Я хочу проверить свою топологию Kafka Streams с помощью модульного теста с использованием kafka-streams-test-utils. Я использую эту библиотеку уже более долгое время и уже построил некоторый абстрактный слой вокруг моих тестов, используя TestNG. Но поскольку я добавил merge(...) в свой поток, я получил следующее исключение:

 org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=my-topic-2, partition=0, offset=0
 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.MyKey / value type: com.MyValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 3 more
Caused by: java.lang.ClassCastException: class com.MyKey cannot be cast to class [B (com.MyValue is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 15 more

Вот часть, как я строю поток с помощью StreamBuilder из TopologyTestDriver:

// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
    "my-topic-2",
    consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
    (key, value) -> {
        List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
        // Do stuff an fill out the list
        return list;
    })
 .through("tmp-topic");

// Block 2
KStream<MyKey, MyValue>[] branches = stream1
    .merge(stream2)
    ... business stuff

Для создания сообщений на источнике topi c я использую TopologyTestDriver.pipeInput(...), инициализированный с JsonSerDes. Исключение происходит путем приведения ByteArray, но я не знаю, почему ожидаемый параметр ByteArraySerializer - это тот же класс, но из другого модуля, чем загруженный потребляемый класс. Они также могут быть загружены другим ClassLoaders. Но в фоновом режиме нет стека Spring, и все должно работать синхронно.

Я действительно запутался в этом поведении.

Apache Зависимости Кафки имеют версию: 2.0. 1 и я использую openjdk-11 . Можно ли выровнять загрузку классов сериализаторов? Ошибка возникает, только если я выдаю что-то на: my-topi c -2 , остальные топи c слияния работают нормально.

Ответы [ 2 ]

3 голосов
/ 21 апреля 2020

Не видя весь ваш код, я не могу сказать наверняка, но вот то, что, я думаю, могло бы произойти.

Предоставление Serdes с Consumed обеспечивает де-сериализацию только при использовании записей из входная тема; Потоки Кафки не распространяют их по остальной топологии. В любой момент, если Serde требуется снова, Kafka Streams использует те, которые указаны в StreamsConfig. Serdes.ByteArraySerde является значением по умолчанию.

Я бы предложил две вещи:

  1. Использование Produced.with(keySerde, valueSerde) в узлах приемника
  2. Предоставьте Serde для ваш тип через StreamsConfig.

HTH, и дайте мне знать, как все работает.

-Bill

2 голосов
/ 21 апреля 2020

Как упоминалось @bbejeck, вам нужно будет использовать другую версию .through(), ту, которая позволяет переопределять стандартные (ByteArraySerde) serdes, примененные к K, V.

KStream<K,V> through​(java.lang.String topic,
                     Produced<K,V> produced) 

Материализуйте этот поток в topi c и создайте новый KStream из topi c, используя экземпляр Produced для конфигурации key serde, value serde и StreamPartitioner. ... Это эквивалентно вызову to(someTopic, Produced.with(keySerde, valueSerde) и StreamsBuilder # stream (someTopicName, Consumed.with (keySerde, valueSerde)).

...