Разница между двумя способами создания потребительской кафки с java пружинной загрузкой - с или без ко - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть два разных метода, которые создают слушатель kafka и читает сообщение из topi c, все на локальной машине, и есть файл properties.yml со свойствами bootstrapAddress и topic_name. Первое использование containerFactory с kafkaListenerContainerFactory в то время как второе использование только @ restController и @ KafkaListener . Оба работают правильно ... В чем главное отличие? Заранее спасибо

1) в @ springbootapplication класс

  public static class MessageListener {
    @KafkaListener(topics = "${greeting.topic.name}", containerFactory = 
  "greetingKafkaListenerContainerFactory")
    public void greetingListener(Greeting greeting) {
        System.out.println("Recieved message: " + greeting);
        //   this.greetingLatch.countDown();
    }
}

с классом:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory(groupId));
    return factory;
}



public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

  }

2) просто:

 @RestController
 public class KafkaConsumerController {

@KafkaListener(topics = { "test" })
public void getTopics(@RequestBody String msg) {

    System.out.println("Kafka event consumed is: " + msg);

    JSONObject jsonObject = new JSONObject(msg);
    int id = (int) ( jsonObject.get("id"));

    System.out.println("valore id: " + id);
}
...