Поведение Singleton в WebJobs с помощью пользовательских процессоров очередей - PullRequest
0 голосов
/ 05 февраля 2019

Моя команда пыталась заставить некоторые из наших пользовательских обработчиков очереди WebJobs иметь поведение Singleton, но мы не получили такого поведения ни с атрибутом [Singleton(Mode = SingletonMode.Listener)], ни с настройкой QueueProcessorFactoryContext.BatchSize = 1,Это приводит к тому, что ночные процессы срывают базу данных одновременно - многие из них по таймауту - и это становится головной болью.

Это более или менее похоже на то, как выглядит наш CustomQueueProcessorFactory:

    public class CustomQueueProcessorFactory : IQueueProcessorFactory
    {
        public QueueProcessor Create(QueueProcessorFactoryContext context)
        {
            if (context == null)
                throw new ArgumentNullException(nameof(context));

            if (context.Queue.Name == Constants.UploadQueueName 
                || context.Queue.Name == Constants.BuildQueueName)
            {
                context.BatchSize = 1;
            }

            return new QueueProcessor(context);
        }
    }

На это ссылаются при настройке нашего JobHost:

    var config = new JobHostConfiguration();
    config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();

Мы также настроили некоторые функции с помощью QueueTrigger s, например:

    public static async Task ExecuteBackgroundRequest([QueueTrigger(Constants.BackgroundQueueName)] BackgroundRequest background, TextWriter logger)
    {
        await ExecuteRequest(background, logger);
    }

    [Singleton(Mode = SingletonMode.Listener)]
    public static async Task ExecuteUploadRequest([QueueTrigger(Constants.UploadQueueName)] BackgroundRequest background, TextWriter logger)
    {
        await ExecuteRequest(background, logger);
    }

    [Singleton(Mode = SingletonMode.Listener)]
    public static async Task ExecuteBuildRequest([QueueTrigger(Constants.BuildQueueName)] BackgroundRequest background, TextWriter logger)
    {
        await ExecuteRequest(background, logger);
    }

Мы используем пакетыMicrosoft.Azure.WebJobs (+ .Core, .Extensions) v2.0.0 и WindowsAzure.Storage v8.0.0, которые несколько устарели, поэтому одно из потенциальных решений, которое я изучал, - это обновление до последней стабильной версии WebJobs (v3).0.4).Это открыло совершенно новую банку червей, так как конфигурация была полностью переделана, и все классы были перемещены.Документация кажется разреженной / разбросанной, поэтому мне еще предстоит определить, где (или даже если) я могу настроить свойства для каждого QueueProcessor, например, установить BatchSize в 1 для некоторых очередей, в то время как для других больше.

Isесть какая-то версия WebJobs, где я могу использовать вышеупомянутую логику CustomQueueProcessorFactory для ограничения BatchSize?Или где атрибут Singleton фактически гарантирует, что только один фоновый процесс одновременно обращается к определенной очереди?Можно ли настроить QueueProcessorFactories в самой последней версии WebJobs?

Помощь по любому из этих вопросов будет чрезвычайно полезна!

1 Ответ

0 голосов
/ 06 февраля 2019

WebJobs SDK облегчает распространенные сценарии распределенной блокировки через его атрибут SingletonAttribute.Вы можете просто применить SingletonAttribute к функции задания, чтобы гарантировать, что все вызовы этой функции будут сериализованы, даже в масштабированных экземплярах.Это полезно, если вашей функции требуется доступ к другим распределенным ресурсам или выполнение других операций, которые не должны / не могут выполняться одновременно.

[Singleton]
public static async Task ProcessImage([BlobTrigger("images")] Stream image)
{
     // Process the image
}

Как и в этом примере, только один экземпляр функции ProcessImage будет работать в любой момент времени.Когда функция запускается новым изображением, добавляемым в контейнер изображений, среда выполнения сначала попытается получить блокировку (аренда BLOB-объектов).После получения блокировка удерживается (и возобновляется аренда BLOB-объектов) на время выполнения функции, что исключает запуск других экземпляров.Если другой экземпляр функции запускается во время работы этой функции, он будет ожидать блокировки, периодически опрашивая ее.

Singleton использует Azure Blob Leases под крышками для реализации распределенной блокировки.Все сложности управления арендой BLOB-объектов, обновлением аренды и т. Д. Обрабатываются SDK.

Детали одиночной блокировки также отображаются на панели инструментов WebJobs, включая текущее состояние блокировки для выполняемой функции,а также как долго функция ожидала блокировки, прежде чем ее получить.Эти сведения можно использовать для просмотра и управления конфликтами блокировок.

enter image description here

Сценарии, не связанные с параллелизмом, потребуют использования Singleton.Некоторые триггеры имеют встроенную поддержку управления параллелизмом через свои параметры конфигурации.В таких случаях вам, вероятно, будет более эффективно использовать встроенную поддержку.Вы можете использовать эти настройки, чтобы убедиться, что ваша функция запускается в одном экземпляре.Чтобы гарантировать, что только один экземпляр функции выполняется среди уменьшенных экземпляров, вы также можете применить к функции блокировку Singleton уровня слушателя (например, [Singleton (Mode = SingletonMode.Listener)]).Ручки конфигурации для некоторых из триггеров:

QueueTrigger - Вы можете установить JobHostConfiguration.Queues.BatchSize в 1 ServiceBusTrigger - Вы можете установить ServiceBusConfiguration.MessageOptions.MaxConcururCallsв 1 FileTrigger - Вы можете установить FileProcessor.MaxDegreeOfParallelism в 1 Однако для триггеров, которые по своей природе не поддерживают управление параллелизмом, или если вы хотите сделать более сложную блокировку с помощью областей Singleton (см. ниже), Singleton - этоправильный путь.

Также я бы сказал, что реализуйте ваш CustomQueueProcessorFactory, как показано ниже, используя двойную блокировку: -

 if (_instance == null)
                {
                    lock (SyncObject)
                    {
                        if (_instance == null)
                        {
                            _instance = new CustomQueueProcessor();
                        }
                    }
                }
                return _instance;
...