Как выполнять задачи параллельно, но не более N задач за T секунд? - PullRequest
0 голосов
/ 13 февраля 2020

Мне нужно выполнить много задач параллельно как можно быстрее. Но если моя программа запускает более 30 задач в 1 секунду, она будет заблокирована. Как обеспечить, чтобы задачи выполнялись не более 30 за любой 1-секундный интервал?

Другими словами, мы должны предотвратить запуск новой задачи, если за последний 1-секундный интервал было выполнено 30 задач.

Мое безобразное возможное решение:

private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
    var timeList = new List<DateTime>();

    var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
    var tasksToRun = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));

        await task;

        timeList.Add(DateTime.Now);

        sem.Release();
    });

    await Task.WhenAll(tasksToRun);
}

private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
    return timeList.Count <= maxIntervalCount 
    || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}

Ответы [ 4 ]

0 голосов
/ 14 февраля 2020

Я думаю, что эту проблему можно решить с помощью SemaphoreSlim, ограниченного числом максимальных задач за интервал, а также с помощью Task.Delay, который задерживает выпуск SemaphoreSlim после завершения каждой задачи в течение интервала, равного необходимому интервалу регулирования. Недостатком этой идеи является то, что завершение всей операции будет отложено на период, равный интервалу. Это происходит потому, что задержка применяется после завершения всех задач, хотя она не потребуется для последних. Bellow - это законченное решение, которое также использует Interlocked.Decrement для подсчета количества незавершенных задач и перестает применять задержку для самых последних задач.

public static async Task<IList<TResult>> Process<TSource, TResult>(
    IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
    int maxTasksPerInterval, TimeSpan interval)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (taskFactory == null) throw new ArgumentNullException(nameof(taskFactory));
    if (maxTasksPerInterval < 1)
        throw new ArgumentOutOfRangeException(nameof(maxTasksPerInterval));
    if (interval < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(interval));

    var semaphore = new SemaphoreSlim(maxTasksPerInterval);
    var incompleteTasksCount = 0;
    var tasks = source.Select(async item =>
    {
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            var task = taskFactory(item);
            var result = await task.ConfigureAwait(false);
            var cnt = Interlocked.Decrement(ref incompleteTasksCount);
            if (cnt < 0 || cnt >= maxTasksPerInterval)
            {
                await Task.Delay(interval).ConfigureAwait(false);
            }
            return result;
        }
        finally
        {
            semaphore.Release();
        }
    }).ToArray();
    Interlocked.Add(ref incompleteTasksCount, tasks.Length);
    return await Task.WhenAll(tasks);
}

Пример использования :

var results = await Process(Enumerable.Range(1, 100), async n =>
{
    await Task.Delay(500); // Simulate some async I/O operation
    return n;
}, maxTasksPerInterval: 30, interval: TimeSpan.FromSeconds(1));
0 голосов
/ 13 февраля 2020

Является ли блокировка из-за какого-либо ограничения сервера / брандмауэра / оборудования или она основана на наблюдении?

Вам следует попытаться использовать BlockingCollection<Task> или аналогичные поточно-безопасные коллекции особенно, если работа ваших задач связана с вводом / выводом. Вы даже можете установить емкость на 30:

var collection = BlockingCollection<Task>(30);

Затем вы можете запустить 2 asyn c метод:

var population = Task.Factory.Start(Populate);
var processing = Task.Factory.Start(Dequeue);
await Task.WhenAll(population, processing);

Task Populate()
{
    foreach (...)
        collection.Add(...);
    collection.CompleteAdding();
}
Task Dequeue
{
    while(!collection.IsComplete)
        await collection.Take();                            //consider using TryTake()
}

Если предел существует из-за некоторого истинного ограничения (должно быть очень редко) изменить Populate () следующим образом:

var stopper = Stopwatch.StartNew();
for (var i = ....)                                          //instead of foreach
{
    if (i % 30 == 0)
    {
        if (stopper.ElapsedMilliseconds < 1000)
            Task.Delay(1000 - stopper.ElapsedMilliseconds); //note that this race condition should be avoided in your code
        stopper.Restart();
    }
    collection.Add(...);
}
collection.CompleteAdding();
0 голосов
/ 13 февраля 2020

Код пользователя никогда не должен контролировать, как задачи планируются напрямую. Во-первых, он не может - управление тем, как выполняются задачи, является задачей TaskScheduler . Когда код пользователя вызывает .Start(), он просто добавляет задачу в очередь пула потоков для выполнения. await выполняет уже выполняемые задачи.

В примерах TaskScheduler показано, как создавать ограниченные планировщики параллелизма, но, опять же, существуют более качественные параметры высокого уровня.

Код вопроса в любом случае не ограничивает задачи, стоящие в очереди, он ограничивает их количество. Они все уже бегут. Это похоже на пакетную предыдущую асинхронную операцию в конвейере, позволяющую только ограниченному количеству сообщений перейти на следующий уровень.

ActionBlock с задержкой

Простым, готовым к использованию способом было бы использование ActionBlock с ограниченным MaxDegreeOfParallelism, чтобы обеспечить не более N одновременных Операции могут выполняться одновременно. Если мы знаем, сколько времени занимает каждая операция, мы могли бы добавить небольшую задержку, чтобы убедиться, что мы не превышаем ограничение газа.

В этом случае 7 одновременно работающих рабочих выполняют 4 запроса в секунду, в общей сложности 28 максимальных запросов в секунду. BoundedCapacity означает, что только до 7 элементов будут сохранены во входном буфере до блоков downloader.SendAsync. Таким образом, мы избегаем переполнения ActionBlock, если операции занимают слишком много времени.

var downloader = new ActionBlock<string>(
        async url => {
            await Task.Delay(250);
            var response=await httpClient.GetStringAsync(url);
            //Do something with it.
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);

//Start posting to the downloader
foreach(var item in urls)
{
    await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;

ActionBlock с SemaphoreSlim

Другой вариант - объединить это с SemaphoreSlim, который периодически сбрасывается таймером.

var refreshTimer = new Timer(_=>sm.Release(30));

var downloader = new ActionBlock<string>(
        async url => {
            await semaphore.WaitAsync();
            try 
            {
                var response=await httpClient.GetStringAsync(url);
                //Do something with it.
            }
            finally
            {
                semaphore.Release();
            }
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);

//Start the timer right before we start posting 
refreshTimer.Change(1000,1000);
foreach(....)
{

}
0 голосов
/ 13 февраля 2020

Это фрагмент:

var tasks = new List<Task>();

foreach(item in listNeedInsert)
{
    var task = TaskToRun(item);
    tasks.Add(task);

    if(tasks.Count == 100)
    {
        await Task.WhenAll(tasks);
        tasks.Clear();
    }
}

// Wait for anything left to finish
await Task.WhenAll(tasks);

Обратите внимание, что я скорее добавляю задачу в List<Task>(); и после того, как все добавлено, я жду всех в том же List<Task>();

Что вы делаете здесь:

 var tasks = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (timeList.Count <= maxIntervalCount 
        || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount]);

        await task;

блокируется, пока задача не завершает свою работу, таким образом делая этот вызов:

Task.WhenAll(tasks).Wait();

полностью избыточным. Кроме того, эта строка Task.WhenAll(tasks).Wait(); выполняет ненужную блокировку для метода WhenAll.

...