Асинхронность с клиентом Kafka C # - PullRequest
0 голосов
/ 29 марта 2019

Я пытаюсь использовать Kafka для архитектуры pub / sub в проекте .NET Core. Но клиент Confluent Kafka c # предлагает только синхронный метод "Consume" для прослушивания опубликованных сообщений. Вот пример кода, предоставляющий о том, как использовать этот клиент:

var conf = new ConsumerConfig
    { 
        GroupId = "test-consumer-group",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
    c.Subscribe("my-topic");

    CancellationTokenSource cts = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) => {
        e.Cancel = true; // prevent the process from terminating.
        cts.Cancel();
    };

    try
    {
        while (true)
        {
            try
            {
                var cr = c.Consume(cts.Token);
                Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Error occured: {e.Error.Reason}");
            }
        }
    }
    catch (OperationCanceledException)
    {
        // Ensure the consumer leaves the group cleanly and final offsets are committed.
        c.Close();
    }
}

Я пытаюсь сделать этот вызов асинхронным. Я пытался использовать Task.Run и ThreadPool.QueueUserWorkItem. Они оба работают в условиях асинхронности, но я вижу, что они оба используют рабочий поток, где метод "Consume" бездействует в ожидании сообщения. Я хотел бы создать асинхронность без большей части бездействующего потока. Насколько я знаю, структура async/await ставит задачи в очередь и, таким образом, может создавать асинхронность без дополнительной нагрузки на поток. Как это было бы возможно в моем случае?

Я думал, что ThreadPool.QueueUserWorkItem справится с задачей, и только когда сигнализируется Consume, поток из пула потоков будет использоваться для продолжения выполнения оставшегося кода, но я вижу Consume в режиме ожидания в своем собственном потоке.

...