Архитектура вокруг обработки отдельных и пакетных запросов одновременно - PullRequest
4 голосов
/ 06 февраля 2012

У меня есть служба WCF, размещенная в службе Windows. Эта услуга предоставляет 2 метода:

  1. bool ProcessClaim(string options, ref string xml); Принимает некоторые данные в качестве входных данных, выполняет некоторую обработку (включая операции ввода-вывода, такие как запросы к БД) и возвращает результат обратно.
  2. void RunJob(string ticket); Возвращается немедленно. Согласно ticket, считывает входные данные из хранилища (например, БД или файловой системы), выполняет одинаковую обработку для каждого элемента данных и сохраняет результат обратно в хранилище. Партия обычно состоит из множества претензий.

Пользователи могут вызывать ProcessClaim для обработки отдельных запросов и RunJob для запуска пакетов. Несколько партий могут работать одновременно. Каждый запрос на обработку заключен в Task, поэтому все запросы выполняются параллельно. Проблема не состояла в том, чтобы позволить пакетам переполнить очередь обработки, планируя большое количество запросов. Другими словами, если пользователь выполняет большой пакет, он будет блокировать небольшие пакеты и отдельные запросы на обработку в течение значительного периода времени. Поэтому я придумал следующую схему, хорошо описанную Албахари (очень кратко):

public sealed class ProcessingQueue : IDisposable
{
    private class WorkItem
    {
        public readonly TaskCompletionSource<string> TaskSource;
        public readonly string Options;
        public readonly string Claim;
        public readonly CancellationToken? CancelToken;

        public WorkItem(
            TaskCompletionSource<string> taskSource,
            string options,
            string claim,
            CancellationToken? cancelToken)
        {
            TaskSource = taskSource;
            Options = options;
            Claim = claim;
            CancelToken = cancelToken;
        }
    }

    public ProcessingQueue()
        : this(Environment.ProcessorCount)
    {
    }

    public ProcessingQueue(int workerCount)
    {
        _taskQ = new BlockingCollection<WorkItem>(workerCount * 2);

        for (var i = 0; i < workerCount; i++)
            Task.Factory.StartNew(Consume);
    }

    public void Dispose()
    {
        _taskQ.CompleteAdding();
    }

    private readonly BlockingCollection<WorkItem> _taskQ;

    public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
    {
        var tcs = new TaskCompletionSource<string>();
        _taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
        return tcs.Task;
    }

    public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
    {
        return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
    }

    private void Consume()
    {
        foreach (var workItem in _taskQ.GetConsumingEnumerable())
        {
            if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
                workItem.TaskSource.SetCanceled();
            else
            {
                try
                {
                    workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
                }
                catch (Exception ex)
                {
                    workItem.TaskSource.SetException(ex);
                }
            }
        }
    }

    private static string ProcessItem(string options, string claim)
    {
        // do some actual work here
        Thread.Sleep(2000); // simulate work;
        return options + claim; // return final result
    }
}

Статический метод ProcessRequest можно использовать для обработки отдельных запросов, тогда как метод экземпляра EnqueueTask - для пакетной обработки. Конечно, все партии должны использовать один общий экземпляр ProcessingQueue. Хотя этот подход работает довольно хорошо и позволяет контролировать скорость одновременной работы нескольких партий, мне кажется, что-то не так:

  • Необходимо поддерживать пул рабочих потоков вручную
  • Трудно угадать оптимальное количество рабочих потоков (по умолчанию я использую количество ядер процессора)
  • Пакет потоков остается заблокированным, когда пакеты не запущены, тратя системные ресурсы
  • Связанные с IO части рабочих потоков блока обработки, снижающие эффективность использования ЦП

Интересно, есть ли лучший способ справиться с этим типом сценариев?

Обновление: Одним из требований является обеспечение полной мощности для пакетов, то есть, когда пользователь выполняет один пакет, а другие входящие запросы отсутствуют, все ресурсы должны быть выделены для обработки этого пакета.

1 Ответ

4 голосов
/ 06 февраля 2012

Я бы сказал, что было бы ошибкой иметь один интерфейс службы и один хост-контейнер для обработки этих двух очень разных типов требований.

Вы должны разделить свою службу на две части: одна возвращает ответы на отдельные запросы по требованию, другие очереди выполняют групповые запросы и обрабатывают их в одном потоке.

Таким образом, вы можете предоставить канал высокой доступности своим потребителям в режиме реального времени и автономный канал вашим массовым потребителям. Они могут быть развернуты и управляться как отдельные задачи, позволяющие предлагать разные уровни обслуживания на каждом интерфейсе службы.

Только мои мысли о предлагаемой архитектуре.

UPDATE

Дело в том, что ваш канал обработки громкости является автономным каналом. Этот вид означает, что потребители должны будут стоять в очереди и ждать, и неопределенное количество времени для их запроса на возврат.

Так как насчет очереди на работу? Каждое задание получает все доступные ресурсы во время обработки. После обработки задания вызывающий абонент получает уведомление о завершении задания.

...