Как управлять SerializationException в Spring Cloud Stream - PullRequest
0 голосов
/ 11 октября 2019

Я пытаюсь понять, как управлять org.apache.kafka.common.errors.SerializationException в Spring Cloud Stream. Если я хорошо понял, RetryTemplate рассматривается только для ошибок приложения (происходящих в @StreamListener), поэтому даже выполнение пользовательской конфигурации для него бесполезно. Мне интересно, как лучше, так как при конфигурации по умолчанию приложение застряло с бесконечными попытками прочитать сообщение. Например, исключение этого типа при десериализации Avro:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition member.creation_requested-0 at offset 35. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 116
Caused by: org.apache.avro.AvroTypeException: Found string, expecting int
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)
    at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:503)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:134)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:747)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.lang.Thread.run(Thread.java:748)

Это мой конфиг:

spring:
  cloud:
    stream:
      bindings:
        accountCreationRequestedSink:
          destination: voxloud.account.creation_requested
          consumer:
            useNativeDecoding: true
      kafka:
        binder:
          autoCreateTopics: false
          brokers: ${KAFKA_BOOTSTRAP_ADDRESSES}
          consumer-properties:
            auto.register.schemas: false
            schema.registry.url: ${KAFKA_SCHEMA_REGISTRY_URL}
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          producer-properties:
            auto.register.schemas: false
            schema.registry.url: ${KAFKA_SCHEMA_REGISTRY_URL}
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
        bindings:
          accountCreationRequestedSink:
            consumer:
              configuration:
                key:
                  deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value:
                  deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...