Сначала необходимо создать экземпляр KafkaAdminClient
. Следующее должно сделать трюк для вас:
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')
topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
В качестве альтернативы вы можете использовать confluent_kafka
клиент, который представляет собой облегченную оболочку для librdkafka :
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({"bootstrap_servers": "localhost:9092"})
topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)