У меня 4 потребителя, 3 из которых находятся на версии 0.10.0.0 kafka-клиента, но один перешел на версию 2.0.0
Когда я вызываю RestService.getId для получения версии моей схемы AVRO, это успешнодля трех потребителей, которые работают в ранней версии, но не работают в версии 2.0.0, с этой трассировкой стека.
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 18
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:185)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:104)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:62)
at com.ciscospark.retention.kafkapurgelibrary.avro.deserialize.CompatibleAvroDeserializer.deserialize(CompatibleAvroDeserializer.java:48)
at com.ciscospark.retention.kafkapurgelibrary.avro.deserialize.CompatibleAvroDeserializer.deserialize(CompatibleAvroDeserializer.java:19)
at com.cisco.wx2.kafka.serialization.SparkKafkaDeserializer.deserialize(SparkKafkaDeserializer.java:34)
at com.ciscospark.retention.kafkapurgelibrary.PurgeEventConsumerFactory.lambda$new$1(PurgeEventConsumerFactory.java:80)
at com.cisco.wx2.kafka.serialization.SimpleKafkaDeserializer.deserialize(SimpleKafkaDeserializer.java:22)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1009)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:96)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1186)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1500(Fetcher.java:1035)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
Вот мой код, который создал соединение с реестром схемы. Он использует URL-адрес, начинающийся с https
private RestService getSchemaServiceRestService() {
String avroSchemaRegistryUrl = this.getAvroSchemaRegistryUrl;
RestService restService = new RestService(avroSchemaRegistryUrl);
log.info("Avro schema registry URL {}", avroSchemaRegistryUrl);
if (avroSchemaRegistryUrl.startsWith("https")) {
SSLContext sslContext = null;
try {
sslContext = SSLContextBuilder.create()
.loadKeyMaterial(AvroSerializer.class.getClassLoader().getResource("avroSchemaRegistryClient.jks"),
avroSchemaRegistryKeyPass.toCharArray(),
avroSchemaRegistryKeyPass.toCharArray())
.loadTrustMaterial(AvroSerializer.class.getClassLoader().getResource("avroSchemaRegistryServer.jks"),
avroSchemaRegistryKeyPass.toCharArray())
.build();
} catch (Exception e) {
log.error("Exception when creating sslContext for schema registry client");
throw new RuntimeException("Exception when creating sslContext for schema registry client.", e);
}
SSLSocketFactory factory = sslContext.getSocketFactory();
restService.setSslSocketFactory(factory);
log.info("Configured SSL for schema registry client");
}
return restService;
}
Эта функция завершается успешно, но когда я делаю первый вызов RestService.getId, я получаю это исключение.
Кто-нибудь знает, как я могу получить свойпотребитель версии 2.0.0 для работы?