Пример многопоточной роли рабочего очереди Azure - PullRequest
1 голос
/ 28 августа 2011

У нас есть 4 очереди Azure, которые заполняются либо прямым REST API, либо предоставляемой нами службой WCF.

  1. Мы хотели бы иметь ОДНУ рабочую роль для мониторинга всех этих 4 очередей.
  2. Я думаю об использовании многопоточности, которая читает имя очереди и т. Д. Из конфигурации и запускает метод процесса (который читает сообщение из очереди и выполняет обработку)

Может кто-нибудь пожалуйстадайте мне пример или руководство о том, как добиться этого в роли рабочего, пожалуйста?

Не слишком уверен, что вышеуказанного можно достичь без многопоточности, так как я новичок в многопоточности.

Спасибо

Ответы [ 3 ]

2 голосов
/ 28 августа 2011

Вы можете запускать разные потоки для разных задач, но также следует учитывать и многопоточный подход (который может работать лучше или хуже в зависимости от того, что вы делаете с сообщениями):

while (true)
{
    var msg = queue1.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue1.DeleteMessage(msg);
    }
    msg = queue2.GetMessage();
    if (msg != null)
    {
        didSomething = true;
        // do something with it
        queue2.DeleteMessage(msg);
    }
    // ...
    if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
}
1 голос
/ 20 сентября 2011

Вот наша текущая реализация, чтобы сделать именно то, что вы запрашиваете, но лучше (или мы так думаем).Тем не менее, этот код нуждается в некоторой тяжелой очистке.Это функциональная версия 0.1, однако.

public class WorkerRole : RoleEntryPoint
{
    public override void Run()
    {
        var logic = new WorkerAgent();
        logic.Go(false);
    }

    public override bool OnStart()
    {
        // Initialize our Cloud Storage Configuration.
        AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);

        return base.OnStart();
    }
}

public class WorkerAgent
{
    private const int _resistance_to_scaling_larger_queues = 9;
    private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
                                                       {
                                                           {typeof (Queue1.Processor), 1},
                                                           {typeof (Queue2.Processor), 1},
                                                           {typeof (Queue3.Processor), 1},
                                                           {typeof (Queue4.Processor), 1},
                                                       };

    private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
    private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
    protected TimeSpan CurrentDelay { get; set; }

    public Func<string> GetSpecificQueueTypeToProcess { get; set; }

    /// <summary>
    /// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
    /// </summary>
    public Dictionary<Type, int> QueueWeights
    {
        get
        {
            return _queueWeights;
        }
        set
        {
            _queueWeights = value;
        }
    }

    public static TimeSpan QueueWeightCalibrationDelay
    {
        get { return TimeSpan.FromMinutes(15); }
    }


    protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();


    protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }

    public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
    {
        CurrentDelay = _minDelay;
        GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
    }

    protected IProcessQueues CurrentProcessor { get; set; }

    /// <summary>
    /// Processes queue request(s).
    /// </summary>
    /// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
    public void Go(bool onlyProcessOnce)
    {
        if (onlyProcessOnce)
        {
            ProcessOnce(false);
        }
        else
        {
            ProcessContinuously();
        }
    }

    public void ProcessContinuously()
    {
        while (true)
        {
            // temporary hack to get this started.
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// Attempts to fetch and process a single queued request.
    /// </summary>
    public void ProcessOnce(bool shouldDelay)
    {
        PopulateQueueMetaData(QueueWeightCalibrationDelay);

        if (shouldDelay)
        {
            Thread.Sleep(CurrentDelay);
        }

        var typesToPickFrom = new List<Type>();
        foreach(var item in QueueWeights)
        {
            for (var i = 0; i < item.Value; i++)
            {
                typesToPickFrom.Add(item.Key);
            }
        }

        var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
        var typeToTryAndProcess = typesToPickFrom[randomIndex];

        CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
        CleanQueueDelays();

        if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
        {
            var errors = CurrentProcessor.Go();

            var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
                               ? _maxDelay // the queue was empty
                               : _minDelay; // else

            QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
        }
        else
        {
            ProcessOnce(true);
        }
    }

    /// <summary>
    /// This method populates/refreshes the QueueMetaData collection.
    /// </summary>
    /// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
    private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
    {
        if (QueueMetaData == null)
        {
            QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
        }

        var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
        var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
        var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
        var results = new Dictionary<Type, AzureQueueMetaData>();

        foreach (var queueProcessorType in queuesWithoutMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);

                    QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
                                                  ? 1
                                                  : (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
                }
            }
        }

        foreach (var queueProcessorType in expiredQueueMetaData)
        {
            if (!results.ContainsKey(queueProcessorType))
            {
                var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                if (queueProcessor != null)
                {
                    var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                    var metaData = queue.GetMetaData();
                    results.Add(queueProcessorType, metaData);
                }
            }
        }

        QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
    }

    private void CleanQueueDelays()
    {
        QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
    }
}

С этим у нас есть отдельный класс, который знает, как обрабатывать каждую очередь, и он реализует IProcessQueues.Мы загружаем коллекцию _queueWeights каждым из тех типов, которые мы хотим обработать.Мы устанавливаем константу _resistance_to_scaling_larger_queues, чтобы контролировать, как мы хотим, чтобы это масштабировалось.Обратите внимание, что это масштабируется логарифмически (см. Метод PopulateQueueMetaData).Ни одна очередь не имеет веса менее 1, даже если она содержит 0 элементов.Если вы установите PopulateQueueMetaData на 10, то для каждого увеличения величины порядка 10 вес этого типа будет увеличиваться на 1. Например, если у вас есть QueueA с 0 элементами, QueueB с 0 элементами иQueueC с 10 элементами, тогда ваши соответствующие веса равны 1, 1 и 2. Это означает, что QueueC с 50% вероятностью будет обработан следующим, в то время как QueueA и QueueB каждый имеют только 25% вероятности обработки.Если в QueueC 100 предметов, то ваш вес равен 1, 1, 3, а ваши шансы на обработку составляют 20%, 20%, 60%.Это гарантирует, что ваши пустые очереди не будут забыты.

Другая вещь, которую это делает, состоит в том, что она имеет _minDelay и _maxDelay.Если этот код считает, что в очереди есть хотя бы 1 элемент, он будет обрабатывать его так же быстро, как и со скоростью _minDelay.Однако, если в последний раз в нем было 0 элементов, он не будет обрабатываться быстрее, чем скорость _maxDelay.Таким образом, это означает, что если генератор случайных чисел поднимает очередь (независимо от веса), в которой есть 0 элементов, он просто пропустит попытку обработки и перейдет к следующей итерации.(В эту часть можно внести некоторую дополнительную оптимизацию для повышения эффективности транзакций с хранилищем, но это небольшое небольшое дополнение.)

У нас есть пара пользовательских классов (например, AzureQueue и AzureQueueMetaData) -один по сути является оберткой для CloudQueue, а другой хранит некоторую информацию, такую ​​как приблизительный счетчик очереди - там ничего интересного (просто способ упростить код).

Опять же, я надеваюНе называйте этот «красивый» код, но некоторые довольно умные концепции реализованы и функциональны в этом коде.Используйте его по любой причине.:)

Наконец, написание такого кода позволяет нам иметь один проект, который может обрабатывать МНОЖЕЕ больше очередей.Если мы обнаружим, что это просто не идет в ногу, мы можем легко масштабировать его до большего числа экземпляров, и это масштабируется для ВСЕХ очередей.В минимальном сценарии вы можете развернуть один экземпляр этого для мониторинга 3 очередей.Однако, если четвертая очередь начинает влиять на производительность (или вам нужна более высокая доступность), увеличьте ее до 2-х экземпляров.Как только вы попали в 15 очередей, добавьте третью.25 очередей, добавить 4-й экземпляр.Заведите нового клиента и вам нужно обрабатывать МНОГО запросов очереди во всех системах, это нормально.Вращайте эту одну роль до 20 мгновений, пока не закончите, а затем вращайте их обратно вниз.Есть особенно неприятная очередь?Прокомментируйте эту очередь из коллекции _queueWeights, разверните ее для управления остальными вашими очередями, затем повторно разверните ее со всеми другими очередями, кроме этой, закомментированной из коллекции _queueWeights, а затем снова разверните ее в другом наборе экземпляров.и выполняйте отладку без а) вмешательства других QueueProcessors в вашу отладку и б) вмешательства в ваши другие QueueProcessors.В конечном счете, это обеспечивает МНОГО гибкости и эффективности.

0 голосов
/ 28 августа 2011

Внутри цикла while рабочей роли запустите 4 потока, как будто вы пишете многопоточное приложение на C #.Конечно, вам нужно определить четыре разные функции потока, и эти функции должны иметь отдельные циклы while для опроса очередей.В конце цикла while работника просто дождитесь завершения потоков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...