Как просмотреть и установить offsets.retention.minutes, используя kafka-configs - PullRequest
2 голосов
/ 21 июня 2019

Из документации это конфиг брокера, но детали тонкие.

1) Как правильно установить неопределенное значение? Если я это сделаю, смогу ли я удалить группы потребителей и связанные с ними смещения вручную?

2) Могу ли я установить индивидуальные группы потребителей с разным сроком хранения?

3) Как просмотреть удержание в глобальном масштабе или для определенной группы потребителей? Могу ли я просмотреть это и получить номер, даже если я не установил это раньше?

1 Ответ

2 голосов
/ 21 июня 2019

1) Документы Кафки упоминают, что действительные значения для offsets.retention.minutes

[1, ...]

, что означаетчто вы не можете установить offsets.retention.minutes=-1, но вы можете установить его в довольно большое целое число, чтобы сохранить смещения в течение длительного периода.

2) Я бы предположил, что вы спрашиваете, можете ли вы установить различные значения для offsets.retention.minutes на уровне темы;Насколько я знаю, это невозможно.Вы можете изменять эту конфигурацию только на уровне посредника, то есть внутри файла server.properties.

РЕДАКТИРОВАТЬ: Я боюсь, что вы не можете избежать повторного перезапуска, поскольку вы не можете изменить offsets.retention.minutes с помощью kafka-configs.sh;

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 0 --alter --add-config offsets.retention.minutes=200800

Error while executing config command requirement failed: Unknown Dynamic Configuration 'retention.minutes'.
java.lang.IllegalArgumentException: requirement failed: Unknown Dynamic Configuration 'retention.minutes'.
        at scala.Predef$.require(Predef.scala:224)
        at kafka.server.DynamicConfig$$anonfun$kafka$server$DynamicConfig$$validate$1.apply(DynamicConfig.scala:101)
        at kafka.server.DynamicConfig$$anonfun$kafka$server$DynamicConfig$$validate$1.apply(DynamicConfig.scala:100)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at kafka.server.DynamicConfig$.kafka$server$DynamicConfig$$validate(DynamicConfig.scala:100)
        at kafka.server.DynamicConfig$Broker$.validate(DynamicConfig.scala:59)
        at kafka.admin.AdminUtils$.changeBrokerConfig(AdminUtils.scala:555)
        at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:105)
        at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:68)
        at kafka.admin.ConfigCommand.main(ConfigCommand.scala)

3) Поскольку это конфигурация на уровне брокера, вы можете просмотреть значение в файле server.properties или в файлах журналов вашего брокера.Например,

 grep offsets.retention.minutes /path/to/your/kafka-broker.log

Обратите внимание, что даже если вы не настроили offsets.retention.minutes, его значение по умолчанию равно 10080.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...