Java Kafka потребитель не возобновляет с того места, где он ушел - PullRequest
0 голосов
/ 10 апреля 2019

У меня есть потоки производителей, помещающие данные в тему Kafka, и один потребительский поток опрашивает каждый элемент за раз. Это потребительская конфигурация:

    private final KafkaConsumer<Long, Integer> consumer;

    public static final String TOPIC = "requestsss";

    {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

    }

И у потребителя есть следующий код, работающий в цикле:

  ConsumerRecords<Long, Integer> consumerRecords = consumer.poll(Duration.ofMillis(10000));
  consumer.commitSync();
  return consumerRecords.records(TOPIC).iterator().next().value();

Затем потребительский процесс останавливается, не использовав все созданные сообщения. Проблема в том, что когда он перезапускается, он не возобновляет процесс с того места, где он ушел; вместо этого он действует так, как если бы все сообщения были использованы при предыдущем запуске.

Кто-нибудь знает, что я делаю не так?

Примечание: я также пытался установить

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

и true и false, а также удаление consumer.commitSync(), но ни один из них не дал желаемого эффекта.

Спасибо за вашу помощь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...