Я создаю аврокласс, который содержит строку и карту в виде полей. Я могу сгенерировать класс avro через maven, и мне удалось создать реестр в localhost: 8081
.avs c file:
{
"type":"record",
"name":"AvroClass",
"namespace":"belliPack.avro",
"fields":[
{
"name":"title",
"type":"string"
},
{
"name":"map",
"type": {"type": "map", "values": "double"}
}
]
}
Реестр схемы возвращает это: $ curl -X GET http://localhost: 8081 / субъектов / teste1-значение / версии / 1
{"subject":"teste1-value","version":1,"id":42,"schema":"{"type":"record","name":"AvroClass","namespace":"belliPack.avro","fields":[{"name":"title","type":"string"},{"name":"map","type":{"type":"map","values":"double"}}]}"}
Мой класс Kafka Producer:
public KafkaProducer<String, AvroClass> createKafkaProducer() {
String bootstrapServer = "127.0.0.1:9092";
String schemaRegistryURL = "127.0.0.1:8081";
//create Producer properties
Properties properties = new Properties();
//kafka documentation>producer configs
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryURL);
//create producer
KafkaProducer<String, AvroClass> producer = new KafkaProducer<>(properties);
return producer;
}
Но когда работает мой Kafka Producer есть эта ошибка:
Exception in thread "Thread-1" Exception in thread "Thread-3" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)```