СериализацияException при подключении к реестру схемы AVRO - PullRequest
0 голосов
/ 05 ноября 2019

У меня 4 потребителя, 3 из которых находятся на версии 0.10.0.0 kafka-клиента, но один перешел на версию 2.0.0

Когда я вызываю RestService.getId для получения версии моей схемы AVRO, это успешнодля трех потребителей, которые работают в ранней версии, но не работают в версии 2.0.0, с этой трассировкой стека.

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 18
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
    at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
    at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
    at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:185)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:104)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:62)
    at com.ciscospark.retention.kafkapurgelibrary.avro.deserialize.CompatibleAvroDeserializer.deserialize(CompatibleAvroDeserializer.java:48)
    at com.ciscospark.retention.kafkapurgelibrary.avro.deserialize.CompatibleAvroDeserializer.deserialize(CompatibleAvroDeserializer.java:19)
    at com.cisco.wx2.kafka.serialization.SparkKafkaDeserializer.deserialize(SparkKafkaDeserializer.java:34)
    at com.ciscospark.retention.kafkapurgelibrary.PurgeEventConsumerFactory.lambda$new$1(PurgeEventConsumerFactory.java:80)
    at com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer.deserialize(SimpleKafkaDeserializer.java:22)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1009)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:96)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1186)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1500(Fetcher.java:1035)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)

Вот мой код, который создал соединение с реестром схемы. Он использует URL-адрес, начинающийся с https

    private RestService getSchemaServiceRestService() {
        String avroSchemaRegistryUrl = this.getAvroSchemaRegistryUrl;
        RestService restService = new RestService(avroSchemaRegistryUrl);
        log.info("Avro schema registry URL {}", avroSchemaRegistryUrl);

        if (avroSchemaRegistryUrl.startsWith("https")) {
            SSLContext sslContext = null;
            try {
                sslContext = SSLContextBuilder.create()
                        .loadKeyMaterial(AvroSerializer.class.getClassLoader().getResource("avroSchemaRegistryClient.jks"),
                                avroSchemaRegistryKeyPass.toCharArray(),
                                avroSchemaRegistryKeyPass.toCharArray())
                        .loadTrustMaterial(AvroSerializer.class.getClassLoader().getResource("avroSchemaRegistryServer.jks"),
                                avroSchemaRegistryKeyPass.toCharArray())
                        .build();
            } catch (Exception e) {
                log.error("Exception when creating sslContext for schema registry client");
                throw new RuntimeException("Exception when creating sslContext for schema registry client.", e);
            }

            SSLSocketFactory factory = sslContext.getSocketFactory();
            restService.setSslSocketFactory(factory);
            log.info("Configured SSL for schema registry client");
        }

        return restService;
    }

Эта функция завершается успешно, но когда я делаю первый вызов RestService.getId, я получаю это исключение.

Кто-нибудь знает, как я могу получить свойпотребитель версии 2.0.0 для работы?

...