KafkaAvroSerializer с несколькими URL-адресами реестра Avro - PullRequest
0 голосов
/ 02 ноября 2018

у нас есть KafkaAvroSerde, настроенный с несколькими URL-адресами для авторегистрации. В какой-то момент serde получил тайм-аут при попытке зарегистрировать схему на 1 URL-адресе, но так как он выдал исключение ввода-вывода вплоть до потокового приложения, поток потока закрылся. С точки зрения приложения потока kafka этот вид не поддается цели поддержки возможности нескольких URL-адресов при создании avro-serdes, так как исключение среды выполнения, переполняющее стек API DSL, закроет поток потока. пара вопросов:

  1. Есть ли хороший способ справиться с этим?
  2. Нужно ли вводить повторную попытку в логике приложения (что может быть непросто, когда вы просто материализуете тему в магазине)?
  3. В противном случае есть обертка avroserde, которая
    может повторить попытку с фактической настройкой avroRegistry URL?
  4. При материализации в местный магазин rocksDB добавляется ли
    значение для регистрации схемы в реестре или мы должны настроить auto.register.schemas на false?

>

Exception in thread "mediafirst-npvr-adapter-program-mapping-mtrl02nsbe02.pf.spop.ca-f5e097bd-ff1b-42da-9f7d-2ab9fa5d2b70-GlobalStreamThread" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"ProgramMapp
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:68)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
at com.bell.cts.commons.kafka.store.custom.CustomStoreProcessor.process(CustomStoreProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl.forward(GlobalProcessorContextImpl.java:52)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:87)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:282)

1 Ответ

0 голосов
/ 02 ноября 2018

С точки зрения приложения потока kafka этот тип не поддается цели иметь возможность поддерживать несколько URL-адресов при создании avro-сердец, так как исключение во время выполнения, переполняющее стек API DSL, закроет поток потока.

Я не согласен: с точки зрения Kafka Streams сериализация не удалась, и, следовательно, приложение должно быть закрыто. Обратите внимание, что Kafka Streams не зависит от используемых вами Serdes и, следовательно, не знает, что ваш Serde общается с реестром схемы и может повторить попытку.

Таким образом, Serde отвечает за внутреннюю повторную попытку. Я не знаю обертку, которая делает это, но это не должно быть слишком сложно, чтобы построить себя. Я создам внутренний тикет для отслеживания этого запроса. Я думаю, что имеет смысл добавить это для готового опыта.

Для RocksDB: все записи, которые записываются в RocksDB, также записываются в тему журнала изменений. Таким образом, чтобы Kafka Streams мог читать эти данные для восстановления состояния после ошибки, вам необходимо зарегистрировать схемы.

...