Как удалить данные, которые уже были использованы потребителем?Кафка - PullRequest
0 голосов
/ 24 октября 2018

Я делаю репликацию данных в кафке.Но размер файла журнала kafka увеличивается очень быстро.Размер достигает 5 Гб в день.Чтобы решить эту проблему, я хочу немедленно удалить обработанные данные.Я использую метод удаления записи в AdminClient для удаления смещения.Но когда я смотрю на файл журнала, данные, соответствующие этому смещению, не удаляются.

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

Мне не нужны предложения типа (log.retention.hours, log.retention.bytes,log.segment.bytes, log.cleanup.policy = delete)

Поскольку я просто хочу удалить данные, полученные потребителем.В этом решении я также удалил данные, которые не используются.

Какие у вас предложения?

Ответы [ 2 ]

0 голосов
/ 20 декабря 2018

Попробуйте это

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

В этом коде вам нужно вызвать entry.getValue().get().lowWatermark(), потому что adminClient.deleteRecords (recordsToDelete) возвращает карту Futures, вам нужно дождаться запуска Future, вызвавполучить ()

0 голосов
/ 24 октября 2018

Вы не сделали ничего плохого.Код, который вы предоставили, работает, и я проверил его.На всякий случай, если я что-то пропустил в вашем коде, мой:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

Я использовал группу: 'org.apache.kafka', имя: 'kafka-clients', версия: '2.0.0 '

Поэтому проверьте, нацелен ли вы на правильный раздел (0 для первого)

Проверьте версию вашего брокера: https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html говорит:

Эта операция поддерживается брокерами с версией 0.11.0.0

Создание сообщений из того же приложения, чтобы убедиться, что вы правильно подключены.

Существует еще один вариантвы можете рассмотреть.Использование cleanup.policy = compact Если ваши ключи сообщений повторяются, вы можете воспользоваться этим.Не только потому, что старые сообщения для этого ключа будут автоматически удаляться, но вы можете использовать тот факт, что сообщение с нулевой полезной нагрузкой удаляет все сообщения для этого ключа.Только не забудьте установить delete.retention.ms и min.compaction.lag.ms на достаточно малые значения.В этом случае вы можете использовать сообщение, а затем создать нулевую полезную нагрузку для того же ключа (но будьте осторожны с этим подходом, поскольку таким образом вы можете удалять сообщения (с этим ключом), которые вы не использовали)

...