Как отбросить поля значений по умолчанию AVRO-данных на стороне потребителя Kafka AVRO? - PullRequest
0 голосов
/ 27 августа 2018

Я определил схему в реестре схем с 10 полями. Использование confluent-3.3.0 , Кафка 0.10

{"schema": "{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}"}

Поля без значения по умолчанию: userName, uID Поля со значением по умолчанию: компания, возраст, номер, первая компания, последняя компания, номер мобильного телефона, адрес электронной почты, год рождения .

Код производителя Kafka приведен ниже. Я передаю только 2 поля userName, uID производителю Kafka, то есть AvroProducer

 public class AvroProducer {
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        //props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";

        Schema.Parser parser = new Schema.Parser();
        // I will get below schema string from SCHEMA REGISTRY
        Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}");

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
        GenericRecord record = new GenericData.Record(schema);
        record.put("uID", "06080000");
        record.put("userName", "User data10");

        ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
        producer.send(recordData);

        System.out.println("Message Sent");
    }

А код потребителя -

public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("group.id", "consumer1");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while(true){
    ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
    for (ConsumerRecord<String, GenericRecord> rec : recs) {
        System.out.printf("{AvroConsumer}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
    }
}
}

При выполнении приведенного выше кода я получаю исключение в конце источника.

 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string in field location of http
    at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:145)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:92)
    at kafka.CTKafkaAvroSerializer.serialize(CTKafkaAvroSerializer.java:12)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
    at kafka.AvroProducerWithJSON.main(AvroProducerWithJSON.java:64)
Caused by: java.lang.NullPointerException
    at org.apache.avro.io.Encoder.writeString(Encoder.java:121)
    at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:267)

Чтобы преодолеть указанное выше исключение, я заполнил поля по умолчанию значениями из самой схемы, а именно. компания, возраст, номер, первая компания, последняя компания, номер мобильного телефона, адрес электронной почты, год рождения на конце производителя. Теперь я получаю вывод на потребительском конце, как показано ниже.

{AvroConsumer}: Recieved [key= ip, value= {"userName": "User data10", "uID": "06080000", "company": "ABC", "age": 0, "number": 0,"firstCompany": "","lastCompany": "","mobileNumber": 0,"email": "","birthYear": 1980}]

Итак, я получаю значения по умолчанию для всех других (непреднамеренных) полей. Я хочу избежать этого. Я хочу, чтобы мой вывод содержал только те поля, которые я должен был передать (в данном случае userName, uID ) и отбросить все поля значений по умолчанию.

Будем весьма благодарны за любые указатели вокруг этого.

...