Schema-Registry отклоняет неизмененную схему как несовместимую - PullRequest
0 голосов
/ 25 января 2019

У нас есть кластер kafka, работающий со схемами Avro, которые хранятся в реестре схем Confluent. При недавнем повторном развертывании (одного из) наших потоковых приложений мы начали видеть несовместимые ошибки схемы в одной теме (EmailSent). Это единственная тема с ошибками, и мы получаем ошибку каждый раз, когда в теме фиксируется новое событие EmailSent.

Причина: org.apache.kafka.common.errors.SerializationException: Ошибка регистрации схемы Avro: {"type": "record", "name": "EmailSent", "namespace": "com.company_name.communications .schemas "," fields ": [{" name ":" customerId "," type ":" long "," doc ":" Идентификатор клиента в службе поддержки клиентов "}, {" name ":" messageId "," type ":" long "," doc ":" Идентификатор сообщения отправленной электронной почты "}, {" name ":" sentTime "," type ": {" type ":" string "," avro.java.string ":" String "}," doc ":" Время отправки кампании в формате 'гггг-мм-дд чч: мм: ss.SSS' "}, {" name ":" campaignId "," type ":" long "," doc ":" Идентификатор кампании в комплекте маркетинга "}, {" name ":" appId "," type ": [" null "," long "]," doc ":" Идентификатор приложения связан с отправленным письмом, если письмо было связано с конкретным приложением "," по умолчанию ": null}]," версия ": 1} Вызывается: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: регистрируемая схема несовместима с более ранней схемой; код ошибки: 409; код ошибки: 409 в io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest (RestService.java:170) в io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest (RestService.java:187) на io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema (RestService.java:238) на io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema (RestService.java:230) на io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema (RestService.java:225) в io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId (CachedSchemaRegistryClient.java:59) в io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register (CachedSchemaRegistryClient.java:91) в io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:72) на io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:54) в org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send (RecordCollectorImpl.java:91) в org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send (RecordCollectorImpl.java:78) в org.apache.kafka.streams.processor.internals.SinkNode.process (SinkNode.java:87) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.kstream.internals.KStreamFilter $ KStreamFilterProcessor.process (KStreamFilter.java:43) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.kstream.internals.KStreamMap $ KStreamMapProcessor.process (KStreamMap.java:42) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.kstream.internals.KStreamPeek $ KStreamPeekProcessor.process (KStreamPeek.java:44)в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.kstream.internals.KStreamMapValues ​​$ KStreamMapProcessor.process (KStreamMapValues.java:41) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply (ForwardingCacheFlushListener.java:42) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward (CachingKeyValueStore.java:92) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.access $ 000 (CachingKeyValueStore.java:35) в org.apache.kafka.streams.state.internals.CachingKeyValueStore $ 1.apply (CachingKeyValueStore.java:79) в org.apache.kafka.streams.state.internals.NamedCache.flush (NamedCache.java:141) в org.apache.kafka.streams.state.internals.NamedCache.evict (NamedCache.java:232) в org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict (ThreadCache.java:245) в org.apache.kafka.streams.state.internals.ThreadCache.put (ThreadCache.java:153) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal (CachingKeyValueStore.java:193) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.put (CachingKeyValueStore.java:188) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.put (CachingKeyValueStore.java:35) в org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put (InnerMeteredKeyValueStore.java:199) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put (MeteredKeyValueBytesStore.java:121) в org.apache.kafka.streams.kstream.internals.KTableSource $ KTableSourceProcessor.process (KTableSource.java:63) в org.apache.kafka.streams.processor.internals.ProcessorNode $ 1.run (ProcessorNode.java:46) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs (StreamsMetricsImpl.java:211) в org.apache.kafka.streams.processor.internals.ProcessorNode.process (ProcessorNode.java:124) в org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward (ProcessorContextImpl.java:85) в org.apache.kafka.streams.processor.internals.SourceNode.process (SourceNode.java:80) в org.apache.kafka.streams.processor.internals.StreamTask.process (StreamTask.java:222) в org.apache.kafka.streams.processor.internals.AssignedTasks.process (AssignedTasks.java:409) в org.apache.kafka.streams.processor.internals.TaskManager.process (TaskManager.java:308) в org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit (StreamThread.java:939) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:819) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:771) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:741)

Эта схема не изменилась с июня 2018 года, и до этого момента мы успешно обрабатывали события EmailSent.

PR, связанный с развертыванием нашего приложения Streams, не меняет схему, процессор потоков, выбрасывающий ошибки, ни какую-либо из зависимостей приложения потоков.Мое подозрение лежит в схеме-реестре, есть ли у кого-нибудь опыт работы с чем-то подобным или понимание того, что может быть причиной сбоя?Я не смог найти никакой информации по коду ошибки 409, это звонит кому-нибудь в колокола?

Заранее спасибо.

1 Ответ

0 голосов
/ 26 января 2019

Я не думаю, что сервер будет лежать. Вы не показали две ваши схемы, чтобы мы могли их сравнить (одна в реестре и другая в вашем сообщении об ошибке).

Одним из способов решения этой проблемы является настройка конфигурации на НЕТ совместимости,

export KAFKA_TOPIC=logEvents
curl -X PUT http://schema-registry:8081/config/${KAFKA_TOPIC}-value -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"

(сделайте то же самое для ${KAFKA_TOPIC}-key, если вам нужно)

затем загрузите вашу новую схему.

Но

  1. Верните его обратно в обратную совместимость (или в исходную конфигурацию), как только вы закончите
  2. Это может привести к повреждению сообщений, считываемых потребителем Avro, из старой и новой несовместимой схемы.
...