У меня есть потоки производителей, помещающие данные в тему Kafka, и один потребительский поток опрашивает каждый элемент за раз. Это потребительская конфигурация:
private final KafkaConsumer<Long, Integer> consumer;
public static final String TOPIC = "requestsss";
{
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
}
И у потребителя есть следующий код, работающий в цикле:
ConsumerRecords<Long, Integer> consumerRecords = consumer.poll(Duration.ofMillis(10000));
consumer.commitSync();
return consumerRecords.records(TOPIC).iterator().next().value();
Затем потребительский процесс останавливается, не использовав все созданные сообщения. Проблема в том, что когда он перезапускается, он не возобновляет процесс с того места, где он ушел; вместо этого он действует так, как если бы все сообщения были использованы при предыдущем запуске.
Кто-нибудь знает, что я делаю не так?
Примечание: я также пытался установить
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
и true и false, а также удаление consumer.commitSync()
, но ни один из них не дал желаемого эффекта.
Спасибо за вашу помощь