Кафка - Ошибка в создании класса продюсера в ASP.Net Core - PullRequest
2 голосов
/ 18 июня 2019

Я использую Confluent.Kafka

string kafkaEndpoint = "127.0.0.1:9092";

        string kafkaTopic = "testtopic";

        var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };

        using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
        {
            // Send 10 messages to the topic
            for (int i = 0; i < 10; i++)
            {
                var message = $"Event {i}";
                var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
                Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
            }
        }

Я получаю следующую ошибку времени компиляции:

Producer.ProduceAsync (TopicPartition, Message) 'недоступен из-за егоуровень защиты

Использование ProducerBuilder, например:

var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();

показывает ошибку:

Cannot convert from 'Confluent.Kafka.Message<Confluent.Kafka.Null, MyClass>' to 'Confluent.Kafka.Message<Confluent.Kafka.Null, string>

1 Ответ

1 голос
/ 18 июня 2019

В Confluent.Kafka nuget v1.0.1, Producer класс является внутренним классом, то есть он недоступен. Похоже, вам нужно использовать ProducerBuilder вместо, например. как:

var producerConfig = new Dictionary<string, string> { { "bootstrap.servers", kafkaEndpoint } };

using (var producer = new ProducerBuilder<Null, string>(producerConfig)
    .SetKeySerializer(Serializers.Null)
    .SetValueSerializer(Serializers.Utf8)
    .Build())
{
    // Send 10 messages to the topic
    for (int i = 0; i < 10; i++)
    {
        var message = $"Event {i}";
        var result = producer.ProduceAsync(kafkaTopic, new Message<Null, string>{ Value = message}).GetAwaiter().GetResult();
        Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
    }
}

Похоже, что экземпляр произвольного класса может быть отправлен как (заменить MyClass целевым классом):

        var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...