Сообщения не удаляются из файловой системы при вызове Java-API deleteRecords Kafka Admin Client. - PullRequest
0 голосов
/ 25 июня 2019

Я пытался удалить сообщения из моей темы kafka, используя метод удаления записей API администратора клиента Java. Ниже приведены шаги, которые я попробовал


    1. I pushed 20000 records to my TEST-DELETE topic
    2. Started a console consumer and consumed all the messages
    3. Invoked my java program to delete all those 20k messages
    4. Started another console consumer with a different group id. This consumer is not receiving any of the deleted messages

Когда я проверял файловую систему, я все еще мог видеть все эти записи 20 КБ, занимающие место на диске. Я намерен навсегда удалить эти записи из файловой системы.

Конфигурация моей темы приведена ниже вместе с настройками server.properties


Topic:TEST-DELETE       PartitionCount:4        ReplicationFactor:1     Configs:cleanup.policy=delete
        Topic: TEST-DELETE    Partition: 0      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 1      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 2      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 3      Leader: 0     Replicas: 0       Isr: 0


    log.retention.hours=24
    log.retention.check.interval.ms=60000
    log.cleaner.delete.retention.ms=60000
    file.delete.delay.ms=60000
    delete.retention.ms=60000
    offsets.retention.minutes=5
    offsets.retention.check.interval.ms=60000
    log.cleaner.enable=true
    log.cleanup.policy=compact,delete

Мой код для удаления указан ниже


public void deleteRecords(Map<String, Map<Integer, Long>> allTopicPartions) {

        Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();

        allTopicPartions.entrySet().forEach(topicDetails -> {

            String topicName = topicDetails.getKey();
            Map<Integer, Long> value = topicDetails.getValue();

            value.entrySet().forEach(partitionDetails -> {

                if (partitionDetails.getValue() != 0) {
                    recordsToDelete.put(new TopicPartition(topicName, partitionDetails.getKey()),
                            RecordsToDelete.beforeOffset(partitionDetails.getValue()));
                }
            });
        });

        DeleteRecordsResult deleteRecords = this.client.deleteRecords(recordsToDelete);

        Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = deleteRecords.lowWatermarks();

        lowWatermarks.entrySet().forEach(entry -> {
            try {
                logger.info(entry.getKey().topic() + " " + entry.getKey().partition() + " "
                        + entry.getValue().get().lowWatermark());
            } catch (Exception ex) {

            }
        });

    }

Вывод моей Java-программы приведен ниже



2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 1 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 0 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 3 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 2 5000

Мое намерение - удалить использованные записи из файловой системы, так как я работаю с ограниченным хранилищем для моего брокера kafka.

Я бы хотел получить помощь, если у меня возникли следующие сомнения

  1. У меня сложилось впечатление, что удаление записей также удалит сообщения из файловой системы, но, похоже, я ошибся !!
  2. Как долго эти удаленные записи будут присутствовать в каталоге журналов?
  3. Есть ли какая-либо конкретная конфигурация, которую мне нужно использовать для удаления записей из файловой системы после вызова API удаления записей?

Ценю вашу помощь

Спасибо

1 Ответ

0 голосов
/ 26 июня 2019

Рекомендуемый подход для решения этой проблемы - установить retention.ms и соответствующие значения конфигурации для интересующих вас тем. Таким образом, вы можете определить, как долго Kafka будет хранить ваши данные, пока не удалит их, убедившись, что все ваши нижестоящие потребители имели возможность извлечь данные, прежде чем они будут удалены из кластера Kafk.

Однако, если вы все же хотите принудительно удалить Kafka на основе байтов, есть значения конфигурации log.retention.bytes и retention.bytes. Первый - это параметр для всего кластера, второй - параметр для конкретной темы, который по умолчанию принимает значение, установленное для первого, но вы все равно можете переопределить его для каждой темы. Число retention.bytes применяется для каждого раздела, поэтому вы должны умножить его на общее количество разделов темы.

Имейте в виду, однако, что если у вас есть беглый производитель, который внезапно начинает генерировать много данных, и у вас установлено жесткое ограничение в байтах, вы можете уничтожить данные за весь день в кластере, и оставьте только последние несколько минут данных, возможно, даже прежде, чем действительные потребители смогут извлечь данные из кластера. Вот почему намного лучше настроить темы кафки так, чтобы они сохранялись на основе времени, а не на основе байтов.

Вы можете найти свойства конфигурации и их объяснение в официальных документах Kafka: https://kafka.apache.org/documentation/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...