Я пытаюсь использовать Kafka для архитектуры pub / sub в проекте .NET Core. Но клиент Confluent Kafka c # предлагает только синхронный метод "Consume
" для прослушивания опубликованных сообщений. Вот пример кода, предоставляющий о том, как использовать этот клиент:
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("my-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
Я пытаюсь сделать этот вызов асинхронным. Я пытался использовать Task.Run
и ThreadPool.QueueUserWorkItem
. Они оба работают в условиях асинхронности, но я вижу, что они оба используют рабочий поток, где метод "Consume
" бездействует в ожидании сообщения. Я хотел бы создать асинхронность без большей части бездействующего потока. Насколько я знаю, структура async/await
ставит задачи в очередь и, таким образом, может создавать асинхронность без дополнительной нагрузки на поток. Как это было бы возможно в моем случае?
Я думал, что ThreadPool.QueueUserWorkItem
справится с задачей, и только когда сигнализируется Consume
, поток из пула потоков будет использоваться для продолжения выполнения оставшегося кода, но я вижу Consume
в режиме ожидания в своем собственном потоке.