Вот наша текущая реализация, чтобы сделать именно то, что вы запрашиваете, но лучше (или мы так думаем).Тем не менее, этот код нуждается в некоторой тяжелой очистке.Это функциональная версия 0.1, однако.
public class WorkerRole : RoleEntryPoint
{
public override void Run()
{
var logic = new WorkerAgent();
logic.Go(false);
}
public override bool OnStart()
{
// Initialize our Cloud Storage Configuration.
AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);
return base.OnStart();
}
}
public class WorkerAgent
{
private const int _resistance_to_scaling_larger_queues = 9;
private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
{
{typeof (Queue1.Processor), 1},
{typeof (Queue2.Processor), 1},
{typeof (Queue3.Processor), 1},
{typeof (Queue4.Processor), 1},
};
private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
protected TimeSpan CurrentDelay { get; set; }
public Func<string> GetSpecificQueueTypeToProcess { get; set; }
/// <summary>
/// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
/// </summary>
public Dictionary<Type, int> QueueWeights
{
get
{
return _queueWeights;
}
set
{
_queueWeights = value;
}
}
public static TimeSpan QueueWeightCalibrationDelay
{
get { return TimeSpan.FromMinutes(15); }
}
protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();
protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }
public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
{
CurrentDelay = _minDelay;
GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
}
protected IProcessQueues CurrentProcessor { get; set; }
/// <summary>
/// Processes queue request(s).
/// </summary>
/// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
public void Go(bool onlyProcessOnce)
{
if (onlyProcessOnce)
{
ProcessOnce(false);
}
else
{
ProcessContinuously();
}
}
public void ProcessContinuously()
{
while (true)
{
// temporary hack to get this started.
ProcessOnce(true);
}
}
/// <summary>
/// Attempts to fetch and process a single queued request.
/// </summary>
public void ProcessOnce(bool shouldDelay)
{
PopulateQueueMetaData(QueueWeightCalibrationDelay);
if (shouldDelay)
{
Thread.Sleep(CurrentDelay);
}
var typesToPickFrom = new List<Type>();
foreach(var item in QueueWeights)
{
for (var i = 0; i < item.Value; i++)
{
typesToPickFrom.Add(item.Key);
}
}
var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
var typeToTryAndProcess = typesToPickFrom[randomIndex];
CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
CleanQueueDelays();
if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
{
var errors = CurrentProcessor.Go();
var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
? _maxDelay // the queue was empty
: _minDelay; // else
QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
}
else
{
ProcessOnce(true);
}
}
/// <summary>
/// This method populates/refreshes the QueueMetaData collection.
/// </summary>
/// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
{
if (QueueMetaData == null)
{
QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
}
var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
var results = new Dictionary<Type, AzureQueueMetaData>();
foreach (var queueProcessorType in queuesWithoutMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
? 1
: (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
}
}
}
foreach (var queueProcessorType in expiredQueueMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
}
}
}
QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
}
private void CleanQueueDelays()
{
QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
}
}
С этим у нас есть отдельный класс, который знает, как обрабатывать каждую очередь, и он реализует IProcessQueues.Мы загружаем коллекцию _queueWeights
каждым из тех типов, которые мы хотим обработать.Мы устанавливаем константу _resistance_to_scaling_larger_queues
, чтобы контролировать, как мы хотим, чтобы это масштабировалось.Обратите внимание, что это масштабируется логарифмически (см. Метод PopulateQueueMetaData
).Ни одна очередь не имеет веса менее 1, даже если она содержит 0 элементов.Если вы установите PopulateQueueMetaData
на 10
, то для каждого увеличения величины порядка 10 вес этого типа будет увеличиваться на 1. Например, если у вас есть QueueA с 0 элементами, QueueB с 0 элементами иQueueC с 10 элементами, тогда ваши соответствующие веса равны 1, 1 и 2. Это означает, что QueueC с 50% вероятностью будет обработан следующим, в то время как QueueA и QueueB каждый имеют только 25% вероятности обработки.Если в QueueC 100 предметов, то ваш вес равен 1, 1, 3, а ваши шансы на обработку составляют 20%, 20%, 60%.Это гарантирует, что ваши пустые очереди не будут забыты.
Другая вещь, которую это делает, состоит в том, что она имеет _minDelay
и _maxDelay
.Если этот код считает, что в очереди есть хотя бы 1 элемент, он будет обрабатывать его так же быстро, как и со скоростью _minDelay
.Однако, если в последний раз в нем было 0 элементов, он не будет обрабатываться быстрее, чем скорость _maxDelay
.Таким образом, это означает, что если генератор случайных чисел поднимает очередь (независимо от веса), в которой есть 0 элементов, он просто пропустит попытку обработки и перейдет к следующей итерации.(В эту часть можно внести некоторую дополнительную оптимизацию для повышения эффективности транзакций с хранилищем, но это небольшое небольшое дополнение.)
У нас есть пара пользовательских классов (например, AzureQueue
и AzureQueueMetaData
) -один по сути является оберткой для CloudQueue
, а другой хранит некоторую информацию, такую как приблизительный счетчик очереди - там ничего интересного (просто способ упростить код).
Опять же, я надеваюНе называйте этот «красивый» код, но некоторые довольно умные концепции реализованы и функциональны в этом коде.Используйте его по любой причине.:)
Наконец, написание такого кода позволяет нам иметь один проект, который может обрабатывать МНОЖЕЕ больше очередей.Если мы обнаружим, что это просто не идет в ногу, мы можем легко масштабировать его до большего числа экземпляров, и это масштабируется для ВСЕХ очередей.В минимальном сценарии вы можете развернуть один экземпляр этого для мониторинга 3 очередей.Однако, если четвертая очередь начинает влиять на производительность (или вам нужна более высокая доступность), увеличьте ее до 2-х экземпляров.Как только вы попали в 15 очередей, добавьте третью.25 очередей, добавить 4-й экземпляр.Заведите нового клиента и вам нужно обрабатывать МНОГО запросов очереди во всех системах, это нормально.Вращайте эту одну роль до 20 мгновений, пока не закончите, а затем вращайте их обратно вниз.Есть особенно неприятная очередь?Прокомментируйте эту очередь из коллекции _queueWeights
, разверните ее для управления остальными вашими очередями, затем повторно разверните ее со всеми другими очередями, кроме этой, закомментированной из коллекции _queueWeights
, а затем снова разверните ее в другом наборе экземпляров.и выполняйте отладку без а) вмешательства других QueueProcessors в вашу отладку и б) вмешательства в ваши другие QueueProcessors.В конечном счете, это обеспечивает МНОГО гибкости и эффективности.