ОБНОВЛЕНИЕ: Ошибка при использовании Action<QueueMessage> onSuccess
для актуальной задачи и вызове ее без await
.На самом деле мы передаем async
обратный вызов как onSuccess, и поэтому он не блокируется, а просто запускается, не дожидаясь его.Изменение типа на Func<QueueMessage, Task>
и вызов через await onSuccess(messageObj)
исправили поведение.
Я получаю сообщения, и каждое сообщение обрабатывается длительным заданием, включающим операции ввода-вывода и загрузки процессора.Моя проблема в том, что мой зарегистрированный обработчик сообщений вызывается слишком много раз (в основном столько раз, сколько сообщений), что заставляет мой процесс съедать 100% ресурсов ЦП, и все ужасно замедляется.
Iесть обертка вокруг QueueClient, и я думаю, что у меня возникли проблемы из-за этого.Я понимаю, что мой код неверен, но мне не хватает понимания того, как асинхронизация / ожидание работает в .NET, чтобы исправить это самостоятельно.
Вот код оболочки, он получает либо сообщение json, либо ссылку на хранилище Azure.поэтому он обрабатывает эту логику и передает ее обратному вызову.
public static void RegisterReceiver(string queueName,
Action<QueueMessage> onSuccess, Action<ExceptionReceivedEventArgs> onError)
_queueClient.RegisterMessageHandler(
async (message, cancellationToken) =>
{
var messageObj = new QueueMessage(message);
var regex = new Regex(@"\[blob%([^\]]+)\](.+)");
if (regex.IsMatch(messageObj.Body))
{
// Pulling blob data from storage
var match = regex.Match(messageObj.Body);
messageObj.ContentType = match.Groups[1].Value;
messageObj.Body = await BlobUtil.GetAndDeleteBlobAsync(match.Groups[2].Value, settings.QueueBlobStorageName);
}
onSuccess(messageObj);
},
new MessageHandlerOptions((exceptionArgs) =>
{
onError(exceptionArgs);
return Task.CompletedTask;
})
{
MaxConcurrentCalls = 1
}
);
А вот код вызова с окончательным обратным вызовом:
QueueUtil.RegisterReceiver(QueueUtil.QUEUE_NAME,
async message =>
{
await processService.RunJobs(message);
},
exceptionArgs =>
{
throw new Exception("Exception occurred while receiving message from " + QueueUtil.QUEUE_NAME,
exceptionArgs.Exception);
}
);
Так или иначе со всеми этими асинхронными / ожидающими вещамимой приемник просто продолжает порождать потоки, как только приходят сообщения.