Потребитель Кафки получает нулевое значение при отправке объекта покупателя - PullRequest
0 голосов
/ 17 февраля 2020

Итак, я хочу реализовать приложение, которое считывает данные из файлов формата json. И я создал объект customer для данных в json. И я хочу отправить эти объекты через kafka topi c. До сих пор я успешно отправил сообщение String производителю потребителю. Но когда я пытаюсь отправить объект, на стороне потребителя, когда я делаю .value (). ToString (). Я получил нулевое значение. Ниже приведен код, который я использовал:

Это производитель:

public class MyProducer {

    public static void main(String[] args) throws Exception {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.walmart.net:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "xxxxxxxxx.KafkaJsonSerializer");
        properties.put("acks", "1");
        properties.put("retries", "2");
        properties.put("batch.size", "16384");
        properties.put("linger.ms", "1");
        properties.put("buffer.memory", "33554432");

        KafkaProducer<String, pharmacyData> kafkaProducer = new KafkaProducer<String, pharmacyData>(
                properties);

        String topic = "insights";

        //try {
            Gson gson = new Gson();

            Reader reader = Files.newBufferedReader(Paths.get("......./part.json"));

            List<pharmacyData> pdata = new Gson().fromJson(reader, new TypeToken<List<pharmacyData>>() {}.getType());

            //pdata.forEach(System.out::println);

            reader.close();
        //} catch (Exception e) {
            //e.printStackTrace();
        //}

        for (pharmacyData data : pdata) {
            kafkaProducer.send(new ProducerRecord<String, pharmacyData>(topic, data), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println(recordMetadata.partition() + "--" + recordMetadata.serializedValueSize());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

Это класс объекта клиента:

public class pharmacyData {
    private String load_date;
    private String store_nbr;
    private String state;
    private String pmp_flag;
    private String zero_flag;
    private String submit_ts;

    public pharmacyData(String load_date, String store_nbr, String state, String pmp_flag, String zero_flag, String submit_ts) {
        this.load_date = load_date;
        this.store_nbr = store_nbr;
        this.state = state;
        this.pmp_flag = pmp_flag;
        this.zero_flag = zero_flag;
        this.submit_ts = submit_ts;
    }

    public String getLoad_date() {
        return load_date;
    }

    public void setLoad_date(String load_date) {
        this.load_date = load_date;
    }

    public String getStore_nbr() {
        return store_nbr;
    }

    public void setStore_nbr(String store_nbr) {
        this.store_nbr = store_nbr;
    }

    public String getState() {
        return state;
    }

    public void setState(String state) {
        this.state = state;
    }

    public String getPmp_flag() {
        return pmp_flag;
    }

    public void setPmp_flag(String pmp_flag) {
        this.pmp_flag = pmp_flag;
    }

    public String getZero_flag() {
        return zero_flag;
    }

    public void setZero_flag(String zero_flag) {
        this.zero_flag = zero_flag;
    }

    public String getSubmit_ts() {
        return submit_ts;
    }

    public void setSubmit_ts(String submit_ts) {
        this.submit_ts = submit_ts;
    }

    @Override
    public String toString() {
        return "pharmacyData{" +
                "load_date='" + load_date + '\'' +
                ", store_nbr='" + store_nbr + '\'' +
                ", state='" + state + '\'' +
                ", pmp_flag='" + pmp_flag + '\'' +
                ", zero_flag='" + zero_flag + '\'' +
                ", submit_ts='" + submit_ts + '\'' +
                '}';
    }
}

это Serializer клиента:

public class KafkaJsonSerializer implements Serializer {

    private Logger logger = LogManager.getLogger(this.getClass());
    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, Object o) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsBytes(o);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        return retVal;
    }

    @Override
    public void close() {

    }
}

Это клиентский десериализатор:

public class KafkaJsonDeserializer implements Deserializer {


    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        pharmacyData pdata = null;
        try {
            pdata = mapper.readValue(bytes, pharmacyData.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return pdata;
    }

    @Override
    public void close() {

    }
}

Это потребитель:

public class MyConsumer {
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.walmart.net:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "xxxxxxxx.KafkaJsonDeserializer");
        properties.put("group.id", "consumer-group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");

        KafkaConsumer<String, pharmacyData> consumer = new KafkaConsumer<>(properties);

        String topic = "insights";

        consumer.subscribe(Collections.singletonList(topic));
            while (true) {
                ConsumerRecords<String, pharmacyData> consumerRecords = consumer.poll(100);

                for (ConsumerRecord<String, pharmacyData> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key() + "--" + consumerRecord.toString());
                    //System.out.println(consumerRecord.offset() + "--" + consumerRecord.partition());
                }
            }
    }
}

Может кто-нибудь помочь мне с вопросами? Большое спасибо!

1 Ответ

0 голосов
/ 17 февраля 2020

Проблема решена:

Для решения этой проблемы просто добавьте конструктор по умолчанию, как показано ниже:

public pharmacyData() {

}

См. эту страницу для получения дополнительной информации.

...