QueueUtil порождает слишком много потоков для MessageHandlers, проблема async / await - PullRequest
1 голос
/ 26 марта 2019

ОБНОВЛЕНИЕ: Ошибка при использовании Action<QueueMessage> onSuccess для актуальной задачи и вызове ее без await.На самом деле мы передаем async обратный вызов как onSuccess, и поэтому он не блокируется, а просто запускается, не дожидаясь его.Изменение типа на Func<QueueMessage, Task> и вызов через await onSuccess(messageObj) исправили поведение.

Я получаю сообщения, и каждое сообщение обрабатывается длительным заданием, включающим операции ввода-вывода и загрузки процессора.Моя проблема в том, что мой зарегистрированный обработчик сообщений вызывается слишком много раз (в основном столько раз, сколько сообщений), что заставляет мой процесс съедать 100% ресурсов ЦП, и все ужасно замедляется.

Iесть обертка вокруг QueueClient, и я думаю, что у меня возникли проблемы из-за этого.Я понимаю, что мой код неверен, но мне не хватает понимания того, как асинхронизация / ожидание работает в .NET, чтобы исправить это самостоятельно.

Вот код оболочки, он получает либо сообщение json, либо ссылку на хранилище Azure.поэтому он обрабатывает эту логику и передает ее обратному вызову.

public static void RegisterReceiver(string queueName,
        Action<QueueMessage> onSuccess, Action<ExceptionReceivedEventArgs> onError)
        _queueClient.RegisterMessageHandler(
                async (message, cancellationToken) =>
                {
                    var messageObj = new QueueMessage(message);
                    var regex = new Regex(@"\[blob%([^\]]+)\](.+)");
                    if (regex.IsMatch(messageObj.Body))
                    {
                        // Pulling blob data from storage
                        var match = regex.Match(messageObj.Body);
                        messageObj.ContentType = match.Groups[1].Value;
                        messageObj.Body = await BlobUtil.GetAndDeleteBlobAsync(match.Groups[2].Value, settings.QueueBlobStorageName);
                    }
                    onSuccess(messageObj);
                }, 
                new MessageHandlerOptions((exceptionArgs) =>
                {
                    onError(exceptionArgs);
                    return Task.CompletedTask;
                })
                {
                    MaxConcurrentCalls = 1
                }
            );

А вот код вызова с окончательным обратным вызовом:

QueueUtil.RegisterReceiver(QueueUtil.QUEUE_NAME, 
                async message =>
                {
                    await processService.RunJobs(message);
                }, 
                exceptionArgs =>
                {
                    throw new Exception("Exception occurred while receiving message from " + QueueUtil.QUEUE_NAME, 
                        exceptionArgs.Exception);
                }
            );

Так или иначе со всеми этими асинхронными / ожидающими вещамимой приемник просто продолжает порождать потоки, как только приходят сообщения.

1 Ответ

0 голосов
/ 28 марта 2019

Ошибка при использовании Action<QueueMessage> onSuccess для реальной задачи и вызове ее без await.На самом деле мы передаем обратный вызов async как onSuccess, и поэтому он не блокируется, а просто запускается, не дожидаясь его.Изменение типа на Func<QueueMessage, Task> и вызов через await onSuccess(messageObj) исправили поведение.

...