Сообщения Apache Kafka с истекшим сроком действия не удаляются - PullRequest
1 голос
/ 24 октября 2019

У меня есть служба весенней загрузки (2.1.3), публикующая сообщения в теме kafka (2.12-2.3.0). Служба создает тему и позже, после запуска службы, устанавливает для retention.ms значение 1 секунда.

В настоящее время выполняется отладка этого кода

@SpringBootApplication()
@EnableAsync
public class MetricsMsApplication {

  public static void main(String[] args) {
    SpringApplication.run(MetricsMsApplication.class, args);
}

@Bean
public NewTopic topic1() {

    NewTopic nt = new NewTopic("metrics", 10, (short) 1);
    return nt;
}

@EventListener(ApplicationReadyEvent.class)
private void init() throws ExecutionException, InterruptedException {
    Map<String, Object> config = new HashMap<>();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

    AdminClient client = AdminClient.create(config);

    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "metrics");

    // Update the retention.ms value
    ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000");
    Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
    updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
    AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
    alterConfigsResult.all();
}

}

Я отправляю пару сообщений и считаю до 5, затем запускаю консольного потребителя

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic admst-metrics --from-beginning

и по-прежнему получать сообщения, срок действия которых должен был истечь.

Журналы kafka показывают, что была применена конфигурация retention.ms. Я добавил cleanup.policy и установил его для удаления, но в этом нет необходимости, поскольку он используется по умолчанию.

Что приведет к удалению этих сообщений?

1 Ответ

2 голосов
/ 25 октября 2019

короткий ответ - kafka не был разработан для учета таких низких значений срока хранения.

более длинный ответ:

kafka хранит данные для любого (тематического) раздела в сегментных файлах. в любой момент один сегмент является «активным» и записывается, в то время как все более старые сегменты «закрыты». сохранение / сжатие применяется только к неактивным сегментам.

Кафка катит новые сегменты при попадании в log.roll.ms или log.segment.bytes. значения по умолчанию (см. https://kafka.apache.org/documentation/#brokerconfigs) - 7 дней и / или ~ 1 ГБ.

также есть log.segment.delete.delay.ms, который по умолчанию означает, что любой сегмент сохраняется как минимум в течение минуты.

работа по сжатию / удалению неактивных сегментов выполняется потоками средства очистки журналов, которые спят для log.cleaner.backoff.ms (15 секунд), когда работа не найдена, и проверяют, может ли какой-то конкретный сегмент толькоочищаться каждые log.retention.check.interval.ms (5 минут)

В результате все это означает, что значения хранения где-либо рядом с тем, что вы ищете, по умолчанию невозможны.

youЯ могу попробовать настроить все вышеперечисленные значения и посмотреть, как низко вы можете пойти, но я держу пари, что это не будет хорошо масштабироваться для большого количества тем.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...