Задачи продолжения зависают при использовании LimitedConcurrencyLevelTaskScheduler - PullRequest
4 голосов
/ 26 ноября 2011

Я работаю над использованием TPL в C # (.NET 4.0).

Я создал пользовательский API для упрощения создания веб-запросов и загрузки содержимого (асинхронно, с использованием задач продолжения). Эта часть работает нормально.

Проблема, с которой я сталкиваюсь при попытке использовать LimitedConcurrencyLevelTaskScheduler (можно найти в Samples для параллельного программирования и в MSDN документации для задач ) с отложенным созданием задачи. Если вы не знакомы с этим классом, все, что он делает, это ограничивает степень параллелизма задач, запланированных на произвольное число.

По сути, я хочу отложить создание цепочек задач веб-запросов в задачу, запланированную LimitedConcurrencyLevelTaskScheduler, чтобы я мог ограничить количество одновременных загрузок.

Как подсказал мудрец Стивен Туб , при отсрочке создания Task лучше всего сделать так, чтобы ваш API возвращал Func<Task> или Func<Task<TResult>>. Я сделал это.

К сожалению, моя программа зависает после планирования первого набора одновременных задач. Скажем, мои задачи ограничены 4 степенями параллелизма. В этом случае 4 задачи будут запущены, а затем программа зависнет. Задачи никогда не будут выполнены.

Я создал минимальный пример, чтобы просто проиллюстрировать проблему. Я использую чтение файла вместо WebRequest. Я ограничил степень параллелизма до 1.

class Program
{
    static Func<Task> GetReadTask()
    {
        return () =>
        {
            Console.WriteLine("Opening file.");

            FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);

            byte[] buffer = new byte[32];

            Console.WriteLine("Beginning read.");
            return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
        };
    }

    static void Main()
    {
        LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
        TaskFactory factory = new TaskFactory(ts);

        int[] range = {1, 2, 3};

        var tasks = range.Select(number =>
        {
            Func<Task> getTask = GetReadTask();
            return factory.StartNew(() =>
            {
                var task = getTask();
                task.Wait();
            });
        });

        Task.WaitAll(tasks.ToArray());
    }
}

Чтобы прояснить, что я имею в виду под "зависанием", вот как выглядит вывод.

Opening file.
Beginning read.

И тогда больше ничего не печатается ... навсегда.

Есть какие-нибудь подсказки о том, что происходит?

1 Ответ

5 голосов
/ 26 ноября 2011

Хороший вопрос!

Во-первых, я не уверен, LimitedConcurrencyLevelTaskScheduler является академически правильным решением.Чтобы ограничить число одновременных запросов до N, необходимо заблокировать N задач, что в первую очередь сводит на нет цель использования асинхронных вызовов APM.

Сказав, что это целоенамного проще реализовать, чем альтернативу.Вы должны иметь рабочую очередь и вести учет количества запросов на полеты, а затем создавать рабочие задачи по мере необходимости.Это не тривиально, чтобы получить право, и если число N одновременных запросов будет небольшим, наличие N заблокированных потоков не конец света.

Итак, проблема с вашим кодом состоит в том, что задачи, созданные в других задачахиспользуйте планировщик из родительской задачи.На самом деле это не так для задач, созданных с помощью FromAsync, поскольку они используют базовую реализацию APM и поэтому немного отличаются.

Вы создаете задачи в Main с помощью:

return factory.StartNew( () =>
    {
        var task = getTask();
        task.Wait();
    }
);

factory использует LimitedConcurrencyLevelTaskScheduler( 1 ), поэтому только 1 из этих задач может выполняться одновременно, и эта задача ожидает выполнения задачи, возвращенной из getTask().

Итак, в GetReadTask вы вызываете Task<int>.Factory.FromAsync.Это происходит потому, что FromAsync не учитывает планировщик родительской задачи.

Затем вы создаете продолжение с .ContinueWith(task => fileStream.Close()).Это создает задачу, которая уважает планировщик своего родителя.Поскольку LimitedConcurrencyLevelTaskScheduler уже выполняет задачу (которая в Main заблокирована), продолжение не может быть запущено, и у вас есть тупик.

Решение состоит в том, чтобы запустить продолжение в обычном потоке пула потоков сTaskScheduler.Default.Затем он запускается, и тупик не работает.

Вот мое решение:

static Task QueueReadTask( TaskScheduler ts, int number )
{
    Output.Write( "QueueReadTask( " + number + " )" );

    return Task.Factory.StartNew( () =>
        {
            Output.Write( "Opening file " + number + "." );

            FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );

            byte[] buffer = new byte[ 32 ];

            var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );

            var tClose = tRead.ContinueWith( task =>
                    {
                        Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
                        fileStream.Close();
                    }
                    , TaskScheduler.Default
                );

            tClose.Wait();
        }
        , CancellationToken.None
        , TaskCreationOptions.None
        , ts
    );
}

И Main теперь выглядит так:

static void Main()
{
    LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );

    int[] range = { 1, 2, 3 };

    var tasks = range.Select( number =>
        {
            var task = QueueReadTask( ts, number );

            return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
        }
    )
    .ToArray();

    Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );

    Task.WaitAll( tasks );

    Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}

Естьобратите внимание на пару вещей:

Перемещение task.Wait() в QueueReadTask делает более очевидным, что вы блокируете задачу.Вы можете удалить вызов FromAsync и продолжение и заменить их обычным синхронным вызовом, поскольку вы все равно блокируете.

Задание, возвращаемое из QueueReadTask, может иметь продолжения.По умолчанию они выполняются в планировщике по умолчанию, поскольку они наследуют планировщик родительской задачи, а не предшествующий.В этом случае нет родительской задачи, поэтому используется планировщик по умолчанию.

...