Мой код в java + Spring Boot
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void produce(String message) {
logger.info("Producer : Kafka Topic -> {}, Kafka Message -> {}", TOPIC, message);
kafkaTemplate.send(TOPIC, message);
}
@KafkaListener(topics = TOPIC, groupId = GROUP_ID)
public void consume(String message) {
System.out.println("Kafka consume value ->" + message);
logger.info("Consumer : Kafka Message -> {}", message);
try {
setKafkaStatus(Integer.parseInt(message.trim()));
}catch (Exception e) {
logger.info("Kafka message is not Integer");
setKafkaStatus(0);
}
}
public void closeConnection() {
//code for close connection
}