Соединение внешнего ключа KTable не совместимо с реестром схемы avro - PullRequest
0 голосов
/ 16 марта 2020

Использование слияния 5.4.1

Мы столкнулись с проблемой в новом соединении внешнего ключа KTable при пересылке объединенного потока из объединенного KTable в другую топи c.

Реестр схемы и связанные с Avro сериализатором артефакты: клиентская версия kafkastreams версии 5.4.1: 5.4.1-ccs

После присоединения внешнего ключа к Ktables при использовании потока из ktable с помощью “для каждого », Он работает нормально, но если присоединенный поток перенаправляется с использованием« to »в другую топи c, возникает ошибка реестра схемы.

Сравнение схемы, упомянутой в ошибке, со схемой, которая зарегистрирован в реестре схемы, он точно такой же ... Кажется, уже была похожая проблема с Kafka 2.4.0: https://issues.apache.org/jira/browse/KAFKA-9390, и проблема продолжает существовать при пересылке в другую топи c

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:64)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:236)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
    at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:385)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:560)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:501)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:438)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1591)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
    at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:494)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:432)
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:481)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:222)
    ... 57 more
; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:331) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:431) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:423) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:409) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71) ~[kafka-avro-serializer-5.4.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.4.1.jar:na]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.4.1.jar:na]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.4.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:166) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:106) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:124) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier$1.process(SubscriptionResolverJoinProcessorSupplier.java:107) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier$1.process(SubscriptionResolverJoinProcessorSupplier.java:60) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:432) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-5.4.1-ccs.jar:na]

1 Ответ

0 голосов
/ 17 марта 2020

Мое плохое, проблема была из-за разницы в схеме, которая была зарегистрирована ранее для того же topi c через другой скомпилированный модуль ... Ошибка не была достаточно очевидной, чтобы упомянуть topi c от "до" "вместо этого он упомянул внутреннюю топи c, что привело к путанице. Это больше не проблема.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...