Ошибка Serde в процессе Kafka Streams с Spring Cloud Stream - PullRequest
3 голосов
/ 30 марта 2020

Я пытаюсь обработать некоторые записи Kafka с помощью Spring Cloud Stream 3.0.3.RELEASE, но у меня возникают проблемы с настройкой Serdes, которая выдает ошибку, как только запись входит в конвейер потока.

Это трассировка стека:

30-03-2020 19:28:33 ERROR org.apache.kafka.streams.KafkaStreams              [application-local,,,]: stream-client [joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a] All stream threads have died. The instance will be in error state and should be closed.
Exception in thread "joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=moaii.security.pe.incidence.queue, partition=0, offset=18108, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 35 more

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
    ... 5 more
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 35 more

Это моя функция:

@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, ?>> joinPriorityData() {
    return incidenceStream -> incidenceStream
            .toStream()
            .filter((key, value) -> filterZoneTypeAndPriority(value))
            .selectKey((key, value) -> value.getsInc())
            .mapValues((readOnlyKey, value) -> stateMapper.toIncidendeState(value, null));
}

и это мое application.yml:

  spring.json.value.default.type: RawAccounting
  spring.cloud.stream:
    function.definition: joinPriorityData
    bindings:
      joinPriorityData-in-0:
        destination: moaii.security.pe.incidence.queue
        consumer.valueSerde: IncidenceItemSerde
      joinPriorityData-out-0:
        destination: moaii.security.pe.incidence.state
        producer.valueSerde: IncidenceItemSerde
    kafka:
      binder:
        configuration:
          auto.commit.interval.ms: 100
          auto.offset.reset: latest
      streams:
        binder:
          applicationId: moaii-cep
          content-type: application/json
          configuration:
            default:
              key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              value.serde:  org.springframework.kafka.support.serializer.JsonSerde
              useNativeDecoding: true

Я попробовал много разных конфигураций из множества мест, но любая, похоже, работает. Как вы можете видеть в трассировке стека, значение по умолчанию Serdes игнорируется, и вместо него используется ByteArray.

Я также создал несколько пользовательских serdes, объявив их как bean-компонент и используя в файле конфигурации, как показано, но тот же результат.

В любом случае у меня такое ощущение, что другие конфигурации тоже игнорируются, например, я вижу, как Ktable пропускает некоторые старые записи с нулевым ключом, а затем завершается ошибкой, когда читает первую, а не нулевую, даже когда у меня есть auto.offset.reset: latest

Не могу сказать, если это проблема Кафки или Spring, но не могу ее исправить

Редактировать: В этих журналах Вы можете видеть, как связыватель получает правильный serde для Входящего, но не для исходящего:

31-03-2020 11:54:24 INFO  o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor  [application-local,,,]: Key Serde used for joinPriorityData-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
31-03-2020 11:54:24 INFO  o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor  [application-local,,,]: Value Serde used for joinPriorityData-in-0: com.vcp.moaii.cep.broker.serde.MoaiiSerdes$IncidenceItemSerde

31-03-2020 11:54:27 INFO  o.s.c.stream.binder.kafka.streams.KStreamBinder    [application-local,,,]: Key Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$StringSerde    
31-03-2020 11:54:27 INFO  o.s.c.stream.binder.kafka.streams.KStreamBinder    [application-local,,,]: Value Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$ByteArraySerde

в исходящем, привязка "joinPriorityData-out-0" даже не упоминается, как вы можете видеть

Ответы [ 2 ]

1 голос
/ 31 марта 2020

Интересно, связана ли эта проблема с вашей конфигурацией и методом функции? Попробуйте следующую функцию и конфигурацию и посмотрите, имеет ли это значение.

@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, IncidenceState>> 
  //same as your original code
}

Обратите внимание, что я добавил параметризованный тип для исходящего KStream. Это необходимо для связующего, чтобы правильно вывести типы Serde для использования. Предполагая, что и IncidenceItem, и IncidenceState являются дружественными объектами JSON, вы можете не указывать никаких значений по умолчанию. Однако вам все равно нужно предоставить их, если ваш внутренний лог c должен полагаться на эти Serdes. Ниже приведен модифицированный конфиг. Я удалил ненужные свойства или переставил их.

spring.json.value.default.type: RawAccounting
  spring.cloud.stream:
    function.definition: joinPriorityData
    bindings:
      joinPriorityData-in-0:
        destination: moaii.security.pe.incidence.queue
      joinPriorityData-out-0:
        destination: moaii.security.pe.incidence.state
    kafka:
      streams:
        binder:
          applicationId: moaii-cep
          configuration:
            auto.commit.interval.ms: 100
            auto.offset.reset: latest
            default:
              key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              value.serde:  org.springframework.kafka.support.serializer.JsonSerde

Я не уверен, что вам нужны связыватели Kafka и Kafka Streams. Это не выглядит так из настроек, которыми вы поделились. Поэтому перенесли всю конфигурацию в конфигурацию подшивки Kafka Streams.

Проверьте, имеют ли эти изменения какое-либо значение.

1 голос
/ 31 марта 2020

IncidenceState загружается дважды с двумя различными загрузчиками классов. Он загружается один раз с приложением (загрузчик классов системы) и один раз с загрузчиком классов bootstrap.

Это проблема Кафки. Проверьте это https://discuss.kotlinlang.org/t/classloading-error-with-kafka-streams/4547 и это https://youtrack.jetbrains.com/issue/KT-24966

Там есть обходной способ, как это исправить.

...