Я использую Kafka версии 2.0 и API-интерфейс пользователя java для получения сообщений из темы. Мы используем сервер Kafka с одним узлом и одним потребителем на раздел. Я заметил, что потребитель теряет некоторые сообщения.
Сценарий таков:
Потребительские опросы темы.
Я создал One Consumer Per Thread.
Выбирает сообщения и передает их обработчику для обработки сообщения.
Затем он фиксирует смещения, используя семантику Kafka Consumer «как минимум один раз» для фиксации смещения Kafka.
Параллельно у меня работает другой потребитель с другим идентификатором группы. В этом потребителе я просто увеличиваю счетчик сообщений и фиксирую смещение. У этого потребителя нет потери сообщений.
try {
//kafkaConsumer.registerTopic();
consumerThread = new Thread(() -> {
final String topicName1 = "topic-0";
final String topicName2 = "topic-1";
final String topicName3 = "topic-2";
final String topicName4 = "topic-3";
String groupId = "group-0";
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.49:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
try {
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Arrays.asList(topicName1, topicName2, topicName3, topicName4));
} catch (KafkaException ke) {
logTrace(MODULE, ke);
}
while (service.isServiceStateRunning()) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, byte[]>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, byte[]> record : partitionRecords) {
processMessage(simpleMessage);
}
}
consumer.commitSync();
}
kafkaConsumer.closeResource();
}, "KAKFA_CONSUMER");
} catch (Exception e) {
}