Проверьте наличие темы Кафки программно на Java - PullRequest
0 голосов
/ 24 октября 2018

Как узнать, была ли тема создана в кластере Kafka, программно, без с использованием инструментов CLI, и до того, как пытаться создать тему?

Я столкнулся с проблемой, когда тема не существует, и наше приложение пытается создать несуществующую тему, но уведомляется только через 90 секунд (таймаут метаданных).Я хотел бы знать, существует ли способ узнать, существует ли тема, из кода Java, чтобы мы могли проверить это, прежде чем пытаться отправить сообщение.Думаю, я мог бы взглянуть на код, который использует утилита Kafka CLI, но мне было интересно, может быть, есть API или более простой способ, который я мог упустить.

Ответы [ 2 ]

0 голосов
/ 24 октября 2018

Вы можете использовать метод AdminUtils.topicExists(..) для более старых версий kafka (1.0.0), чтобы проверить, существует ли тема:

    int sessionTimeOutInMs = 15 * 1000;
    int connectionTimeOutInMs = 10 * 1000;
    String zkHost = "localhost:2181";
    ZkClient zkClient = new ZkClient(zkHost, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHost), false);
    System.out.println(AdminUtils.topicExists(zkUtils, "TopicName"));

AdminUtils устарела в последних версиях Kafka.Таким образом, вы можете использовать AdminClient для Кафка 1,0 +:

    Properties prop = new Properties();
    prop.setProperty("bootstrap.servers", "localhost:9092");
    AdminClient admin = AdminClient.create(prop);
    boolean topicExists = admin.listTopics().names().get().stream().anyMatch(topicName -> topicName.equalsIgnoreCase("tealium.topic"));
0 голосов
/ 24 октября 2018

Вы можете использовать AdminClient#listTopics(), чтобы проверить, существует ли данная тема, как показано ниже:

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient client = AdminClient.create(props)) {
    ListTopicsOptions options = new ListTopicsOptions();
    options.listInternal(true); // includes internal topics such as __consumer_offsets
    ListTopicsResult topics = client.listTopics(options);
    Set<String> currentTopicList = topics.names().get();
    // do your filter logic here......
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...