Как определить, существует ли kafka topi c, используя confluent-kafka- python - PullRequest
2 голосов
/ 04 мая 2020

Я использую пакет confluent-kafka- python для взаимодействия с сервером Kafka. Я могу успешно создать для него события topi c и pu sh. Однако моя проблема заключается в том, что я запускаю несколько узлов (работающих в Docker), и если второй экземпляр также пытается создать topi c, я получаю сообщение об ошибке. Мне нужно сначала проверить, существует ли уже topi c, прежде чем создавать новый topi c.

from confluent_kafka.admin import AdminClient, NewTopic
kafka_admin = AdminClient({"bootstrap.servers": server})

# First check here if the topic already exists!
if not topic_exists(topic):  # <-- how to accomplish this?
    new_kafka_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
    results = kafka_admin.create_topics([new_kafka_topic])

Спасибо за любую помощь!

1 Ответ

1 голос
/ 26 мая 2020

У меня была такая же проблема, и я справился с ней следующим образом:

client = AdminClient({"bootstrap.servers": BROKER_URL})
topic_metadata = client.list_topics()
if topic_metadata.topics.get(self.topic_name) is None:
  self.create_topic()
...