У нас есть кластер kafka, работающий со схемами Avro, которые хранятся в реестре схем Confluent. При недавнем повторном развертывании (одного из) наших потоковых приложений мы начали видеть несовместимые ошибки схемы в одной теме (EmailSent). Это единственная тема с ошибками, и мы получаем ошибку каждый раз, когда в теме фиксируется новое событие EmailSent.
Причина: org.apache.kafka.common.errors.SerializationException: Ошибка регистрации схемы Avro: {"type": "record", "name": "EmailSent", "namespace": "com.company_name.communications .schemas "," fields ": [{" name ":" customerId "," type ":" long "," doc ":" Идентификатор клиента в службе поддержки клиентов "}, {" name ":" messageId "," type ":" long "," doc ":" Идентификатор сообщения отправленной электронной почты "}, {" name ":" sentTime "," type ": {" type ":" string "," avro.java.string ":" String "}," doc ":" Время отправки кампании в формате 'гггг-мм-дд чч: мм: ss.SSS' "}, {" name ":" campaignId "," type ":" long "," doc ":" Идентификатор кампании в комплекте маркетинга "}, {" name ":" appId "," type ": [" null "," long "]," doc ":" Идентификатор приложения связан с отправленным письмом, если письмо было связано с конкретным приложением "," по умолчанию ": null}]," версия ": 1}
Вызывается: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: регистрируемая схема несовместима с более ранней схемой; код ошибки: 409; код ошибки: 409
в io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest (RestService.java:170)
в io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest (RestService.java:187)
на io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema (RestService.java:238)
на io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema (RestService.java:230)
на io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema (RestService.java:225)
в io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId (CachedSchemaRegistryClient.java:59)
в io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register (CachedSchemaRegistryClient.java:91)
в io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:72)
на io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:54)
в
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send (RecordCollectorImpl.java:91)
в org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send (RecordCollectorImpl.java:78)
в org.apache.kafka.streams.processor.internals.SinkNode.process (SinkNode.java:87)
в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85)
в org.apache.kafka.streams.kstream.internals.KStreamFilter $ KStreamFilterProcessor.process (KStreamFilter.java:43)
в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46)
в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211)
в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124)
в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85)
в org.apache.kafka.streams.kstream.internals.KStreamMap $ KStreamMapProcessor.process (KStreamMap.java:42)
в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46)
в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211)
в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124)
в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85)
в org.apache.kafka.streams.kstream.internals.KStreamPeek $ KStreamPeekProcessor.process (KStreamPeek.java:44)в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46)
в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211)
в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124)
в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85)
в org.apache.kafka.streams.kstream.internals.KStreamMapValues $ KStreamMapProcessor.process (KStreamMapValues.java:41)
в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46)
в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211)
в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124)
в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85)
в org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply (ForwardingCacheFlushListener.java:42)
в org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward (CachingKeyValueStore.java:92)
в org.apache.kafka.streams.state.internals.CachingKeyValueStore.access $ 000 (CachingKeyValueStore.java:35)
в org.apache.kafka.streams.state.internals.CachingKeyValueStore $ 1.apply (CachingKeyValueStore.java:79)
в org.apache.kafka.streams.state.internals.NamedCache.flush (NamedCache.java:141)
в org.apache.kafka.streams.state.internals.NamedCache.evict (NamedCache.java:232)
в org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict (ThreadCache.java:245)
в org.apache.kafka.streams.state.internals.ThreadCache.put (ThreadCache.java:153)
в org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal (CachingKeyValueStore.java:193)
в org.apache.kafka.streams.state.internals.CachingKeyValueStore.put (CachingKeyValueStore.java:188)
в org.apache.kafka.streams.state.internals.CachingKeyValueStore.put (CachingKeyValueStore.java:35)
в org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put (InnerMeteredKeyValueStore.java:199)
в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put (MeteredKeyValueBytesStore.java:121)
в org.apache.kafka.streams.kstream.internals.KTableSource $ KTableSourceProcessor.process (KTableSource.java:63)
в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46)
в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211)
в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124)
в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85)
в org.apache.kafka.streams.processor.internals.SourceNode.process (SourceNode.java:80)
в org.apache.kafka.streams.processor.internals.StreamTask.process (StreamTask.java:222)
в org.apache.kafka.streams.processor.internals.AssignedTasks.process (AssignedTasks.java:409)
в org.apache.kafka.streams.processor.internals.TaskManager.process (TaskManager.java:308)
в org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit (StreamThread.java:939)
в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:819)
в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:771)
в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:741)
Эта схема не изменилась с июня 2018 года, и до этого момента мы успешно обрабатывали события EmailSent.
PR, связанный с развертыванием нашего приложения Streams, не меняет схему, процессор потоков, выбрасывающий ошибки, ни какую-либо из зависимостей приложения потоков.Мое подозрение лежит в схеме-реестре, есть ли у кого-нибудь опыт работы с чем-то подобным или понимание того, что может быть причиной сбоя?Я не смог найти никакой информации по коду ошибки 409, это звонит кому-нибудь в колокола?
Заранее спасибо.