У меня есть служба WCF, размещенная в службе Windows. Эта услуга предоставляет 2 метода:
bool ProcessClaim(string options, ref string xml);
Принимает некоторые данные в качестве входных данных, выполняет некоторую обработку (включая операции ввода-вывода, такие как запросы к БД) и возвращает результат обратно.
void RunJob(string ticket);
Возвращается немедленно. Согласно ticket
, считывает входные данные из хранилища (например, БД или файловой системы), выполняет одинаковую обработку для каждого элемента данных и сохраняет результат обратно в хранилище. Партия обычно состоит из множества претензий.
Пользователи могут вызывать ProcessClaim
для обработки отдельных запросов и RunJob
для запуска пакетов. Несколько партий могут работать одновременно. Каждый запрос на обработку заключен в Task
, поэтому все запросы выполняются параллельно.
Проблема не состояла в том, чтобы позволить пакетам переполнить очередь обработки, планируя большое количество запросов. Другими словами, если пользователь выполняет большой пакет, он будет блокировать небольшие пакеты и отдельные запросы на обработку в течение значительного периода времени.
Поэтому я придумал следующую схему, хорошо описанную Албахари (очень кратко):
public sealed class ProcessingQueue : IDisposable
{
private class WorkItem
{
public readonly TaskCompletionSource<string> TaskSource;
public readonly string Options;
public readonly string Claim;
public readonly CancellationToken? CancelToken;
public WorkItem(
TaskCompletionSource<string> taskSource,
string options,
string claim,
CancellationToken? cancelToken)
{
TaskSource = taskSource;
Options = options;
Claim = claim;
CancelToken = cancelToken;
}
}
public ProcessingQueue()
: this(Environment.ProcessorCount)
{
}
public ProcessingQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>(workerCount * 2);
for (var i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume);
}
public void Dispose()
{
_taskQ.CompleteAdding();
}
private readonly BlockingCollection<WorkItem> _taskQ;
public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
{
var tcs = new TaskCompletionSource<string>();
_taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
return tcs.Task;
}
public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
{
return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
}
private void Consume()
{
foreach (var workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
workItem.TaskSource.SetCanceled();
else
{
try
{
workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
private static string ProcessItem(string options, string claim)
{
// do some actual work here
Thread.Sleep(2000); // simulate work;
return options + claim; // return final result
}
}
Статический метод ProcessRequest
можно использовать для обработки отдельных запросов, тогда как метод экземпляра EnqueueTask
- для пакетной обработки. Конечно, все партии должны использовать один общий экземпляр ProcessingQueue
. Хотя этот подход работает довольно хорошо и позволяет контролировать скорость одновременной работы нескольких партий, мне кажется, что-то не так:
- Необходимо поддерживать пул рабочих потоков вручную
- Трудно угадать оптимальное количество рабочих потоков (по умолчанию я использую количество ядер процессора)
- Пакет потоков остается заблокированным, когда пакеты не запущены, тратя системные ресурсы
- Связанные с IO части рабочих потоков блока обработки, снижающие эффективность использования ЦП
Интересно, есть ли лучший способ справиться с этим типом сценариев?
Обновление:
Одним из требований является обеспечение полной мощности для пакетов, то есть, когда пользователь выполняет один пакет, а другие входящие запросы отсутствуют, все ресурсы должны быть выделены для обработки этого пакета.