Вы можете использовать GSON, есть способ определить стратегию имени поля.
Gson gson = new GsonBuilder().disableHtmlEscaping().setFieldNamingStrategy(FieldNamingPolicy.UPPER_CAMEL_CASE_WITH_SPACES).create();
, или даже вы можете определить пользовательское.
Gson gson = new GsonBuilder().disableHtmlEscaping().setFieldNamingStrategy(new FieldNamingStrategy() {
@Override
public String translateName(Field f) {
return f.getName().toLowerCase(); //or any logic
}
}).create();
. Для Producer будет работать только StringSerializer
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Предположим, что вы уже заполнили POJO, скажем kafkaRequest, тогда вы можете опубликовать
ProducerRecord producerRecord = new ProducerRecord<String, String>("topicName", null, gson.toJson(kafkaRequest));
На стороне потребителя проанализировать обратно в POJO
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
KafkaRequest kafkaRequest = gson.fromJson(record.value(), KafkaRequest.class);
}
}