Я пишу приложение, которое должно уметь читать и отображать данные IoT. С этим кодом у меня работает базовая c функциональность (я удалил некоторые проверки и т. Д. c, чтобы код был короче):
public void Run()
{
_eventHubClient = EventHubClient.CreateFromConnectionString(ConnectionString, "messages/events");
var partitions = _eventHubClient.GetRuntimeInformation().PartitionIds;
cts = new CancellationTokenSource();
var tasks = partitions.Select(partition => ReceiveMessagesFromDeviceAsync(partition, cts.Token));
Task.WaitAll(tasks.ToArray());
}
public void Cancel()
{
cts.Cancel();
}
private async Task ReceiveMessagesFromDeviceAsync(string partition, CancellationToken cancellationToken)
{
var eventHubReceiver = _eventHubClient.GetDefaultConsumerGroup().CreateReceiver(partition, DateTime.UtcNow);
while (true)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
var eventData = await eventHubReceiver.ReceiveAsync(new TimeSpan(0,0,1));
var data = Encoding.UTF8.GetString(eventData.GetBytes());
Console.WriteLine("Message received at {2}. Partition: {0} Data: '{1}'", partition, data, eventData.EnqueuedTimeUtc);
}
}
Моя проблема в том, что мне нужно иметь возможность остановиться и перезапустите соединение снова. Все работает нормально до того момента, когда я запускаю его в 6-й раз, затем я получаю «QuotaExceededException»: «Превышено максимально допустимое число получателей на раздел в группе потребителей, которое равно 5». Я гуглил исключение и понимаю, что проблема в том, что я не знаю, как правильно закрыть предыдущие получатели после закрытия соединения, чтобы я мог открыть его позже. Я пытался вызвать
eventHubReceiver.Close()
в методе Cancel (), но, похоже, это не помогло.
Я был бы очень признателен за любые подсказки, как решить эту проблему, спасибо.