Кафка отправляет мне сообщения, уже обработанные - PullRequest
0 голосов
/ 04 февраля 2020

У меня есть Java Сервис, который читает сообщения от Кафки. Сервис очень прост. У меня есть слушатель:

@KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.topic.group}", containerFactory = "mesagueKafkaListenerContainerFactory")

Затем у меня есть этот conf:

  @Autowired
  private KafkaProperty kafkaProperty;

  private ConsumerFactory<String, KafkaMessage> mesagueConsumerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.kafkaProperty.getGroupIdTest());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>(KafkaMessage.class));
  }

  private ConsumerFactory<String, String> consumerFactory(String groupId) {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
    return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new JsonDeserializer<>(KafkaMessage.class));
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> mesagueKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(mesagueConsumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setErrorHandler(new KafkaErrorHandler());
    return factory;
  }

И затем:

kafka:
  topic:
      name: name
      group: 1
  bootstrap:
    servers: xxx
  autoCommit: false

Когда я отправляю сообщение в очередь , службы обрабатывают его ОК.

Но когда я перезапускаю службу, она снова читает все сообщения из очереди (сообщения уже обработаны)

Я хочу обрабатывать только новые сообщения, которые не обрабатываются.

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 04 февраля 2020

Автоматическое принятие должно быть истинным

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

, а также добавить это в ваши свойства, чтобы оно всегда обрабатывало необработанные данные.

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

, если автоматическое принятие ложно, вы должны совершить вручную. если это не сделано вручную, смещение никогда не увеличится, и вы будете получать одни и те же данные снова и снова после перезапуска.

0 голосов
/ 04 февраля 2020

Вы устанавливаете auto.commit в false, и вы не фиксируете смещения вручную. Это означает, что потребитель не скажет Кафке: «Эй, я закончил употреблять эти сообщения». Итак, в следующий раз, когда вы перезапустите потребителя, он начнёт с последнего принятого сообщения, которое в вашем случае, вероятно, является первым сообщением в топи c. Если вы хотите, чтобы auto.commit имел значение false, вам нужно acknowledge при получении нового сообщения.

@KafkaListener(....)
public void consume(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    // Consume record

    acknowledgment.acknowledge();
}
...