Как создать тему Kafka с помощью Confluent.Kafka .Net Client - PullRequest
0 голосов
/ 27 апреля 2018

Похоже, что самый популярный клиент .net для Kafka (https://github.com/confluentinc/confluent-kafka-dotnet) не содержит методов для настройки и создания тем). При вызове Producer.ProduceAsync() тема создается автоматически, но я не могу найти способ настроить разделы, политику хранения и другие параметры.

Я пытался найти какие-либо примеры в Интернете, но все, что я нашел, это просто использовать значения по умолчанию.

Может быть, есть другой клиент .net, который я могу использовать вместо этого?

Ответы [ 3 ]

0 голосов
/ 12 июля 2018

Confluent пока не предоставляют API для создания темы из клиента dot net, однако для этого есть обходной путь.

  1. Набор auto.create.topics.enable = true в конфигурации kafka

  2. используйте var brokerMetadata = producer.GetMetadata(false, topicName); для запроса доступных тем в существующих брокерах, если указанная тема недоступно, тогда kafka создаст тему с указанным именем.

    private static bool CreateTopicIfNotExist(Producer producer, string topicName)
    {
        bool isTopicExist = producer.GetMetadata().Topics.Any(t => t.Topic == topicName);
        if (!isTopicExist)
        {
            //Creates topic if it is not exist; Only in case of auto.create.topics.enable = true is set into kafka configuration
            var topicMetadata = producer.GetMetadata(false, topicName).Topics.FirstOrDefault();
            if (topicMetadata != null && (topicMetadata.Error.Code != ErrorCode.UnknownTopicOrPart || topicMetadata.Error.Code == ErrorCode.Local_UnknownTopic))
                isTopicExist = true;
        }
        return isTopicExist;
    }

Таким образом, вы можете использовать эту работу вокруг, я знаю, что это грязное решение, но, похоже, на данный момент другого пути нет.

0 голосов
/ 25 апреля 2019

Теперь доступно в последней версии клиентской библиотеки Confluent.Kafka .Net.

См .: https://github.com/confluentinc/confluent-kafka-dotnet/blob/b7b04fed82762c67c2841d7481eae59dee3e4e20/examples/AdminClient/Program.cs

        using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
        {
            try
            {
                await adminClient.CreateTopicsAsync(new TopicSpecification[] { 
                    new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
            }
            catch (CreateTopicsException e)
            {
                Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
            }
        }
0 голосов
/ 27 апреля 2018

Confluent.Kafka.AdminClient доступен в версии 1.0.0-experimental-2, но не позволяет создавать темы и т. Д.

Он построен на librdkafka , у которого пока нет API для этого .

Так что сейчас вы должны настроить это на брокере, используя, например, bin\windows\kafka-topics.sh --create ...

...