Библиотека параллельных задач - Пользовательские планировщики задач - PullRequest
5 голосов
/ 21 марта 2012

У меня есть требование отсылать запросы веб-сервисов к онлайн-API, и я подумал, что Parallel Extensions подойдет для моих нужд.

Рассматриваемая веб-служба предназначена для многократного вызова, но имеет механизм, который взимает плату, если вы набрали определенное количество вызовов в секунду. Я явно хочу минимизировать свои расходы, поэтому мне было интересно, видел ли кто-нибудь TaskScheduler, который может справиться со следующими требованиями:

  1. Ограничить количество задач, запланированных на промежуток времени. Я предполагаю, что если количество запросов превысит этот лимит, тогда нужно будет отбросить задачу или, возможно, заблокировать? (остановить задний журнал задач)
  2. Определите, есть ли в планировщике тот же запрос, который должен быть выполнен, но еще не выполнен, и если нет, поставьте в очередь вторую задачу, а вместо этого верните первую.

Чувствуют ли люди, что с такими обязанностями должен справляться планировщик задач, или я лаю не на том дереве? Если у вас есть альтернативы, я открыт для предложений.

Ответы [ 4 ]

7 голосов
/ 21 марта 2012

Я согласен с другими, что TPL Dataflow звучит как хорошее решение для этого.

Чтобы ограничить обработку, вы можете создать TransformBlock, который фактически не преобразует данные, а просто задерживает их, если они поступили слишком рано после предыдущих данных:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
    DateTime lastItem = DateTime.MinValue;
    return new TransformBlock<T, T>(
        async x =>
                {
                    var waitTime = lastItem + delay - DateTime.UtcNow;
                    if (waitTime > TimeSpan.Zero)
                        await Task.Delay(waitTime);

                    lastItem = DateTime.UtcNow;

                    return x;
                },
        new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

Затем создайте метод, который производит данные (например, целые числа, начинающиеся с 0):

static async Task Producer(ITargetBlock<int> target)
{
    int i = 0;
    while (await target.SendAsync(i))
        i++;
}

Он написан асинхронно, поэтому, если целевой блок не может обработать элементы прямо сейчас, он будет ждать.

Затем напишите потребительский метод:

static void Consumer(int i)
{
    Console.WriteLine(i);
}

И, наконец, свяжите все это вместе и запустите:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);

Здесь delayBlock будет принимать не более одного элемента каждые 500 мс, а метод Consumer() может запускаться несколько раз параллельно. Чтобы закончить обработку, позвоните delayBlock.Complete().

Если вы хотите добавить кеширование для вашего # 2, вы можете создать еще одну TransformBlock и выполнить там работу и связать ее с другими блоками.

3 голосов
/ 21 марта 2012

Честно говоря, я бы работал на более высоком уровне абстракции и использовал бы для этого API потока данных TPL. Единственный улов - вам нужно написать собственный блок, который будет регулировать запросы с той скоростью, с которой вам нужно, потому что по умолчанию блоки «жадные» и будут обрабатываться настолько быстро, насколько это возможно. Реализация будет выглядеть примерно так:

  1. Начните с BufferBlock<T>, который является логическим блоком для отправки.
  2. Свяжите BufferBlock<T> с пользовательским блоком, который знает запросы / сек и логику регулирования.
  3. Свяжите пользовательский блок от 2 до вашего ActionBlock<T>.

У меня нет времени, чтобы написать пользовательский блок для # 2 прямо сейчас, но я проверю позже и попытаюсь заполнить реализацию для вас, если вы еще не поняли это.

2 голосов
/ 21 марта 2012

Я не много использовал RX, но AFAICT метод Observable.Window для этого подойдет.

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window(VS.103).aspx

Казалось бы, это лучше, чем Throttle, который, кажется, выбрасывает элементы, что, я думаю, не то, что вы хотите

0 голосов
/ 21 марта 2012

Если вам нужно ограничить время, вы должны проверить Quartz.net .Это может способствовать последовательному опросу.Если вы заботитесь обо всех запросах, вам следует рассмотреть возможность использования какого-либо механизма очередей.MSMQ, вероятно, является правильным решением, но есть много конкретных реализаций, если вы хотите пойти дальше и использовать ESB, такой как NServiceBus или RabbitMQ .

Обновление:

В этом случае TPL Dataflow является вашим предпочтительным решением, если вы можете использовать CTP.Решение - ограниченный буферный блок.

Этот пример взят из документации , предоставленной Microsoft :

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> m_buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await m_buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await m_buffer.ReceiveAsync());
    }
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Обновление:

Проверьте RX's Observable.Throttle .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...