Я пытаюсь понять, как управлять org.apache.kafka.common.errors.SerializationException
в Spring Cloud Stream. Если я хорошо понял, RetryTemplate
рассматривается только для ошибок приложения (происходящих в @StreamListener
), поэтому даже выполнение пользовательской конфигурации для него бесполезно. Мне интересно, как лучше, так как при конфигурации по умолчанию приложение застряло с бесконечными попытками прочитать сообщение. Например, исключение этого типа при десериализации Avro:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition member.creation_requested-0 at offset 35. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 116
Caused by: org.apache.avro.AvroTypeException: Found string, expecting int
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)
at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:503)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:134)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:747)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:748)
Это мой конфиг:
spring:
cloud:
stream:
bindings:
accountCreationRequestedSink:
destination: voxloud.account.creation_requested
consumer:
useNativeDecoding: true
kafka:
binder:
autoCreateTopics: false
brokers: ${KAFKA_BOOTSTRAP_ADDRESSES}
consumer-properties:
auto.register.schemas: false
schema.registry.url: ${KAFKA_SCHEMA_REGISTRY_URL}
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
producer-properties:
auto.register.schemas: false
schema.registry.url: ${KAFKA_SCHEMA_REGISTRY_URL}
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
accountCreationRequestedSink:
consumer:
configuration:
key:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
value:
deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer