Кафка создать тему с помощью TopicCommand - PullRequest
0 голосов
/ 08 января 2019

Я хочу создать тему, используя Java. Вот мои коды.

String s = "--topic pt8 --create --zookeeper 10.11.6.52:2181 --replica-assignment 7";
String[] args2 = s.split(" ");
TopicCommand.main(args2);

Но есть ошибка:

[ZkClient-EventThread-14-10.11.6.52: 2181] INFO o.I.z.ZkEventThread - Запуск потока событий ZkClient.

[main] INFO o.I.z.ZkClient - Ожидание состояния хранителя SyncConnected [main-EventThread] INFO o.I.z.ZkClient - состояние zookeeper изменено (SyncConnected)

Ошибка при выполнении команды раздела: java.lang.ExceptionInInitializerError

[ZkClient-EventThread-14-10.11.6.52: 2181] INFO o.I.z.ZkEventThread - Завершить поток событий ZkClient.

--list --zookeeper 10.11.6.52:2181 может получить результаты. --delete --zookeeper 10.11.6.52:2181 --topic pt7 получает Error while executing topic command : null.

My pom.xml :

        <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>

Использование Admin :

ZkClient  zkClient = new ZkClient("10.11.6.52:2181", 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
AdminUtils.createTopic(zkUtils, "pt8", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);

ОШИБКА:

Исключение в потоке "main" kafka.admin.AdminOperationException: java.lang.ExceptionInInitializerError

1 Ответ

0 голосов
/ 08 января 2019

Вместо того чтобы использовать команду оболочки и пытаться выполнить ее из JAVA, используйте клиентский API-интерфейс администратора KAFKA, который должен работать с Kafka 0.11+.

Вот фрагмент кода:

void setUpKafkaTopics(KafkaAdminClient kafkaAdminClient) throws ExecutionException, InterruptedException {
  final Map<String, Integer> topics = new HashMap<>();
  topics.put(topicName, numOfPartitions);
  kafkaAdminClient.createTopics(topics, getTopicConfig(), replicationFactor);
}


Map<String, String> getTopicConfig() {
  Map<String, String> topicConfiguration = new HashMap<>();
  topicConfiguration.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
     Boolean.FALSE.toString());
  topicConfiguration.put(TopicConfig.CLEANUP_POLICY_CONFIG,
     TopicConfig.CLEANUP_POLICY_DELETE);
  topicConfiguration.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
     KAFKA_TOPIC_COMPRESSION_TYPE);
  topicConfiguration.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
     KAFKA_TOPIC_MIN_IN_SYNC_REPLICAS.toString()); 
  topicConfiguration.put(TopicConfig.RETENTION_MS_CONFIG,
     KAFKA_TOPIC_RETENTION_MS.toString()); 
  return topicConfiguration;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...