У меня есть два разных метода, которые создают слушатель kafka и читает сообщение из topi c, все на локальной машине, и есть файл properties.yml со свойствами bootstrapAddress и topic_name. Первое использование containerFactory с kafkaListenerContainerFactory в то время как второе использование только @ restController и @ KafkaListener . Оба работают правильно ... В чем главное отличие? Заранее спасибо
1) в @ springbootapplication класс
public static class MessageListener {
@KafkaListener(topics = "${greeting.topic.name}", containerFactory =
"greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
System.out.println("Recieved message: " + greeting);
// this.greetingLatch.countDown();
}
}
с классом:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
}
2) просто:
@RestController
public class KafkaConsumerController {
@KafkaListener(topics = { "test" })
public void getTopics(@RequestBody String msg) {
System.out.println("Kafka event consumed is: " + msg);
JSONObject jsonObject = new JSONObject(msg);
int id = (int) ( jsonObject.get("id"));
System.out.println("valore id: " + id);
}