Kafka Admin: как программно показать и установить время хранения для каждой темы? - PullRequest
0 голосов
/ 20 февраля 2019

Я пытаюсь программно (Java) получить и установить время хранения некоторого набора тем в кластере Kafka.

Невозможно использовать org.apache.kafka.clients.admin.AdminClient.

Есть ли другой способ, кроме утилит командной строки?

Ответы [ 2 ]

0 голосов
/ 21 февраля 2019

Чтобы подражать kafka-configs --entity-type topics --entity-name "topic" --describe, вы должны иметь возможность использовать AdminClient#describeConfigs simlar для этого.

Здесь я отфильтровываю только те конфигурации, которые были явно определены пользователем.Если вы удалите фильтр, вы получите все конфиги уровня по умолчанию и уровня брокера, а также

Optional<List<ConfigEntry>> dynamicTopicConfigEntries;

try {
    // given org.apache.kafka.client.admin.AdminClient
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "topic");

    dynamicTopicConfigEntries = Optional.of(adminClient.describeConfigs(Collections.singletonList(resource))
            .all()
            .thenApply(configMap -> configMap.get(resource).entries()
                    .stream().filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
                    .collect(toList())
            )
            .get());
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException("Unable to get topic description");
}

Аналогично, для этой команды поддерживается флаг --alter (неимейте код, доступный для этого)

Кроме того, KIP-248 - один, чтобы смотреть.

0 голосов
/ 20 февраля 2019

Существует класс kafka.admin.TopicCommand scala, который используется сценарием оболочки kafka-topics из двоичного дистрибутива Kafka:

https://github.com/apache/kafka/blob/a421dd2a26ca140f821cd5be1a4f716cf04beb43/core/src/main/scala/kafka/admin/TopicCommand.scala#L302-L318

Вы можете использовать его, хотя вынужно включить пакет kafka в качестве зависимости от вашего проекта, а не только клиентов Kafka.

Примерно так, если вы используете Kafka 2.1.1, скомпилированный для Scala 2.12:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
</dependency>

https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.1.1

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...