Я пытался удалить сообщения из моей темы kafka, используя метод удаления записей API администратора клиента Java. Ниже приведены шаги, которые я попробовал
1. I pushed 20000 records to my TEST-DELETE topic
2. Started a console consumer and consumed all the messages
3. Invoked my java program to delete all those 20k messages
4. Started another console consumer with a different group id. This consumer is not receiving any of the deleted messages
Когда я проверял файловую систему, я все еще мог видеть все эти записи 20 КБ, занимающие место на диске. Я намерен навсегда удалить эти записи из файловой системы.
Конфигурация моей темы приведена ниже вместе с настройками server.properties
Topic:TEST-DELETE PartitionCount:4 ReplicationFactor:1 Configs:cleanup.policy=delete
Topic: TEST-DELETE Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: TEST-DELETE Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: TEST-DELETE Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: TEST-DELETE Partition: 3 Leader: 0 Replicas: 0 Isr: 0
log.retention.hours=24
log.retention.check.interval.ms=60000
log.cleaner.delete.retention.ms=60000
file.delete.delay.ms=60000
delete.retention.ms=60000
offsets.retention.minutes=5
offsets.retention.check.interval.ms=60000
log.cleaner.enable=true
log.cleanup.policy=compact,delete
Мой код для удаления указан ниже
public void deleteRecords(Map<String, Map<Integer, Long>> allTopicPartions) {
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
allTopicPartions.entrySet().forEach(topicDetails -> {
String topicName = topicDetails.getKey();
Map<Integer, Long> value = topicDetails.getValue();
value.entrySet().forEach(partitionDetails -> {
if (partitionDetails.getValue() != 0) {
recordsToDelete.put(new TopicPartition(topicName, partitionDetails.getKey()),
RecordsToDelete.beforeOffset(partitionDetails.getValue()));
}
});
});
DeleteRecordsResult deleteRecords = this.client.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = deleteRecords.lowWatermarks();
lowWatermarks.entrySet().forEach(entry -> {
try {
logger.info(entry.getKey().topic() + " " + entry.getKey().partition() + " "
+ entry.getValue().get().lowWatermark());
} catch (Exception ex) {
}
});
}
Вывод моей Java-программы приведен ниже
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 1 5000
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 0 5000
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 3 5000
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 2 5000
Мое намерение - удалить использованные записи из файловой системы, так как я работаю с ограниченным хранилищем для моего брокера kafka.
Я бы хотел получить помощь, если у меня возникли следующие сомнения
- У меня сложилось впечатление, что удаление записей также удалит сообщения из файловой системы, но, похоже, я ошибся !!
- Как долго эти удаленные записи будут присутствовать в каталоге журналов?
- Есть ли какая-либо конкретная конфигурация, которую мне нужно использовать для удаления записей из файловой системы после вызова API удаления записей?
Ценю вашу помощь
Спасибо