Вы не сделали ничего плохого.Код, который вы предоставили, работает, и я проверил его.На всякий случай, если я что-то пропустил в вашем коде, мой:
public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
kafkaAdminClient.deleteRecords(deleteMap);
}
Я использовал группу: 'org.apache.kafka', имя: 'kafka-clients', версия: '2.0.0 '
Поэтому проверьте, нацелен ли вы на правильный раздел (0 для первого)
Проверьте версию вашего брокера: https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html говорит:
Эта операция поддерживается брокерами с версией 0.11.0.0
Создание сообщений из того же приложения, чтобы убедиться, что вы правильно подключены.
Существует еще один вариантвы можете рассмотреть.Использование cleanup.policy = compact Если ваши ключи сообщений повторяются, вы можете воспользоваться этим.Не только потому, что старые сообщения для этого ключа будут автоматически удаляться, но вы можете использовать тот факт, что сообщение с нулевой полезной нагрузкой удаляет все сообщения для этого ключа.Только не забудьте установить delete.retention.ms и min.compaction.lag.ms на достаточно малые значения.В этом случае вы можете использовать сообщение, а затем создать нулевую полезную нагрузку для того же ключа (но будьте осторожны с этим подходом, поскольку таким образом вы можете удалять сообщения (с этим ключом), которые вы не использовали)