Используя KafkaTemplate, на который уже ответил Deadpool, существует другой подход, состоящий в том, чтобы просто искать объект в байтовом массиве и отправлять весь объект.
Это не лучший подход, так как его отвергнутая лучшая практика Kafka - распределять и распараллеливать как можно больше. ,Поэтому в общем случае распространяйте сообщения и позволяйте производителю использовать буфер пула и разделы для распараллеливания. Но иногда нам может понадобиться конкретное использование ........
// Вы можете использовать любой список объектов, я просто использую String, но может быть улучшен для любого Object, поскольку String - ничтоно объект..Если вы используете Pojo, его можно преобразовать в строку JSON и передать в виде списка строк json.
public byte[] searlizedByteArray(List<String> listObject) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] inByteArray = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(listObject);
out.flush();
inByteArray = bos.toByteArray();
} finally {
if (bos != null)
bos.close();
}
return inByteArray;
}
Обнулить массив байтов [] в список объектов
public List<String> desearlizedByteArray(byte[] byteArray) throws IOException, ClassNotFoundException {
ByteArrayInputStream bis = new ByteArrayInputStream(byteArray);
ObjectInput in = null;
List<String> returnList=null;
try {
in = new ObjectInputStream(bis);
List<String> o = (List<String>) in.readObject();
for (String string : o) {
System.out.println("==="+o);
}
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException ex) {
// ignore close exception
}
}
return returnList;
}
Обратите внимание, что мы используем VALUE_SERIALIZER_CLASS_CONFIG в качестве ByteArraySerializer
public void publishMessage() throws Exception {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
//You might to increase buffer memory and request size in case of large size,
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "");
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "");
Producer<String, byte[]> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(
properties);
try {
List asl=new ArrayList<String>();
asl.add("test1");
asl.add("test2");
byte[] byteArray = searlizedByteArray(asl);
ProducerRecord producerRecord = new ProducerRecord<String, byte[]>("testlist", null,
byteArray);
producer.send(producerRecord).get();
} finally {
if (producer != null) {
producer.flush();
producer.close();
}
}
}
Наконец, в потребительском ByteArrayDeserializer будет работать
public void consumeMessage() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
properties.setProperty("group.id", "grp_consumer");
properties.put("auto.offset.reset", "earliest");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(properties);
consumer.subscribe(Arrays.asList("testlist"));
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records) {
}
}
}