Я зарегистрировал схему в разделе «Физика» в реестре schmema.
curl -X POST \
http://localhost:8081/subjects/physics/versions \
-H 'accept: application/vnd.schemaregistry.v1+json' \
-H 'cache-control: no-cache' \
-H 'content-type: application/vnd.schemaregistry.v1+json' \
-H 'postman-token: 5ce4bab3-6d0c-2940-d7dd-8f901b0b6fec' \
-d '{"schema":"{\"type\":\"record\",\"name\":\"physics\",\"fields\":[{\"name\":\"ATTRIBUTE1\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"ATTRIBUTE2\",\"type\":\"long\",\"default\":0},{\"name\":\"ATTRIBUTE3\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"ATTRIBUTE4\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"ATTRIBUTE5\",\"type\":\"string\",\"default\":\"\"}]}"}'
Код моего производителя Kafka похож на этот.
SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:8081", 20);
Schema schema = schemaRegistryClient.getBySubjectAndId("physics", ID);
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(kafkaProps);
GenericRecord inputRecord = new GenericData.Record(schema);
inputRecord.put("ATTRIBUTE1", "val1");
inputRecord.put("ATTRIBUTE2", System.currentTimeMillis());
inputRecord.put("ATTRIBUTE3", "val3");
ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic,
subjectName, inputRecord);
producer.send(recordData).get();
Совместимость: обратный сериализатор: KafkaAvroSerializer
При работе с Kafka-производителем я получаю исключение ниже.
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string in field ATTRIBUTE4 of physics
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:92)
at kafka.MyKafkaAvroSerializer.serialize(MyKafkaAvroSerializer.java:27)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
at kafka.MyAvroProducerV3.main(MyAvroProducerV3.java:62)
Caused by: java.lang.NullPointerException
at org.apache.avro.io.Encoder.writeString(Encoder.java:121)
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:267)
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:262)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:128)
Если я отправляю дополнительный атрибут, а именно.ATTRIBUTE6 от Kafka Producer выдает исключение о том, что ATTRIBUTE6 не зарегистрирован.Это совершенно нормально.
Поскольку я зарегистрировал 5 атрибутов (ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3, ATTRIBUTE4, ATTRIBUTE5), если я отправляю только 3 атрибута (то есть ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3), потребитель должен получать только эти3 атрибута и значение по умолчанию (например, -1) для отсутствующих.Но почему я получаю выше исключения?
Я что-то упустил?