удалить тему-сообщения в Apache kafka - PullRequest
1 голос
/ 19 октября 2019

Я тестирую работу kafka-themes, но я не понимаю, как работает удаление.

Я создал простую тему с

retention.ms = 60000

и

segment.ms = 60000

и

cleanup.policy=delete.

После этого я создал продюсера и яотправил несколько сообщений. Потребитель получает сообщения без проблем. Но я ожидаю, что через одну минуту, если пользователь будет повторен, он не покажет сообщения, потому что они должны быть удалены. Но такое поведение не происходит.

Если я создаю запрос в ksql, это то же самое. Сообщения появляются всегда.

Мне кажется, я не понимаю, как работает удаление.

Пример:

1) Тема

./kafka-topics --create --zookeeper localhost:2181 --topic test -- 
  replication-factor 2 --partitions 1 --config "cleanup.policy=delete" -- 
  config "delete.retention.ms=60000" --config "segment.ms=60000"

2) производитель

./kafka-avro-console-producer --broker-list broker:29092 --topic test-- 
  property parse.key=true --property key.schema='{"type":"long"}' --property 
  "key.separator=:" --property value.schema='{"type": "record","name": 
  "ppp","namespace": "test.topic","fields": [{"name": "id","type": "long"}]}'

3) сообщения от производителя

1:{"id": 1}
 2:{"id": 2}
 4:{"id": 4}
 5:{"id": 5}

4) потребитель

  ./kafka-avro-console-consumer \
    --bootstrap-server broker:29092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic test--from-beginning --property print.key=true

потребитель показывает четыре сообщения.

Но я ожидаю, что если я снова запущу потребителя через одну минуту (я тоже ждал больше времени, даже часов), сообщения не будут отображаться, потому что retention.ms и plot.ms составляют одну минуту.

Когда сообщения действительно удаляются?

Ответы [ 2 ]

1 голос
/ 20 октября 2019

Еще один важный момент, который нужно знать в процессе удаления в Kafka: log segment file:

Темы разделены на разделы, верно? Это то, что допускает параллелизм, масштабирование и т. Д.

Каждый раздел делится на log segments files. Почему? Потому что Кафка пишет данные на диск правильно ...? мы не хотим, чтобы все topic / partition сохранялось в одном огромном файле, но разбиваем его на более мелкие файлы (сегменты).

Разбиение данных на более мелкие файлы имеет много преимуществ,Это действительно связано с вопросом. Можете прочитать больше здесь

Ключевым моментом, на который следует обратить внимание, является следующее:

Политика хранения использует метку времени файла журнала semgnet.

"Сохранение по времени выполняется путем проверки последнего измененного времени (mtime) для каждого файла сегмента журнала на диске. При обычных операциях кластера это время, когда сегмент журнала был закрыт,и представляет собой отметку времени последнего сообщения в файле "

Кафка-определитель , стр. 26)

Версия 0.10.1.0

Время хранения журнала больше не зависит от времени последнего изменения сегментов журнала. Вместо этого он будет основан на самой большой временной отметке сообщений в сегменте журнала.

Это означает, что он просматривается только для закрытых файлов сегмента журнала. Убедитесь, что параметры вашего сегмента config правильные.

0 голосов
/ 19 октября 2019

Измените retention.ms, как упомянуто Аджаем Шриваставой выше, используя kafka-topics --zookeeper localhost:2181 --alter --topic test --config retention.ms=60000, и повторите тест.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...