Я определил схему в реестре схем с 10 полями.
Использование confluent-3.3.0 , Кафка 0.10
{"schema": "{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}"}
Поля без значения по умолчанию: userName, uID
Поля со значением по умолчанию: компания, возраст, номер, первая компания, последняя компания, номер мобильного телефона, адрес электронной почты, год рождения .
Код производителя Kafka приведен ниже. Я передаю только 2 поля userName, uID производителю Kafka, то есть AvroProducer
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
//props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
GenericRecord record = new GenericData.Record(schema);
record.put("uID", "06080000");
record.put("userName", "User data10");
ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
producer.send(recordData);
System.out.println("Message Sent");
}
А код потребителя -
public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("group.id", "consumer1");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
for (ConsumerRecord<String, GenericRecord> rec : recs) {
System.out.printf("{AvroConsumer}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
}
}
}
При выполнении приведенного выше кода я получаю исключение в конце источника.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string in field location of http
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:92)
at kafka.CTKafkaAvroSerializer.serialize(CTKafkaAvroSerializer.java:12)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
at kafka.AvroProducerWithJSON.main(AvroProducerWithJSON.java:64)
Caused by: java.lang.NullPointerException
at org.apache.avro.io.Encoder.writeString(Encoder.java:121)
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:267)
Чтобы преодолеть указанное выше исключение, я заполнил поля по умолчанию значениями из самой схемы, а именно. компания, возраст, номер, первая компания, последняя компания, номер мобильного телефона, адрес электронной почты, год рождения на конце производителя. Теперь я получаю вывод на потребительском конце, как показано ниже.
{AvroConsumer}: Recieved [key= ip, value= {"userName": "User data10", "uID": "06080000", "company": "ABC", "age": 0, "number": 0,"firstCompany": "","lastCompany": "","mobileNumber": 0,"email": "","birthYear": 1980}]
Итак, я получаю значения по умолчанию для всех других (непреднамеренных) полей. Я хочу избежать этого. Я хочу, чтобы мой вывод содержал только те поля, которые я должен был передать (в данном случае userName, uID ) и отбросить все поля значений по умолчанию.
Будем весьма благодарны за любые указатели вокруг этого.