Хороший вопрос!
Во-первых, я не уверен, LimitedConcurrencyLevelTaskScheduler
является академически правильным решением.Чтобы ограничить число одновременных запросов до N, необходимо заблокировать N задач, что в первую очередь сводит на нет цель использования асинхронных вызовов APM.
Сказав, что это целоенамного проще реализовать, чем альтернативу.Вы должны иметь рабочую очередь и вести учет количества запросов на полеты, а затем создавать рабочие задачи по мере необходимости.Это не тривиально, чтобы получить право, и если число N одновременных запросов будет небольшим, наличие N заблокированных потоков не конец света.
Итак, проблема с вашим кодом состоит в том, что задачи, созданные в других задачахиспользуйте планировщик из родительской задачи.На самом деле это не так для задач, созданных с помощью FromAsync
, поскольку они используют базовую реализацию APM и поэтому немного отличаются.
Вы создаете задачи в Main
с помощью:
return factory.StartNew( () =>
{
var task = getTask();
task.Wait();
}
);
factory
использует LimitedConcurrencyLevelTaskScheduler( 1 )
, поэтому только 1 из этих задач может выполняться одновременно, и эта задача ожидает выполнения задачи, возвращенной из getTask()
.
Итак, в GetReadTask
вы вызываете Task<int>.Factory.FromAsync
.Это происходит потому, что FromAsync
не учитывает планировщик родительской задачи.
Затем вы создаете продолжение с .ContinueWith(task => fileStream.Close())
.Это создает задачу, которая уважает планировщик своего родителя.Поскольку LimitedConcurrencyLevelTaskScheduler
уже выполняет задачу (которая в Main
заблокирована), продолжение не может быть запущено, и у вас есть тупик.
Решение состоит в том, чтобы запустить продолжение в обычном потоке пула потоков сTaskScheduler.Default
.Затем он запускается, и тупик не работает.
Вот мое решение:
static Task QueueReadTask( TaskScheduler ts, int number )
{
Output.Write( "QueueReadTask( " + number + " )" );
return Task.Factory.StartNew( () =>
{
Output.Write( "Opening file " + number + "." );
FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );
byte[] buffer = new byte[ 32 ];
var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );
var tClose = tRead.ContinueWith( task =>
{
Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
fileStream.Close();
}
, TaskScheduler.Default
);
tClose.Wait();
}
, CancellationToken.None
, TaskCreationOptions.None
, ts
);
}
И Main
теперь выглядит так:
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );
int[] range = { 1, 2, 3 };
var tasks = range.Select( number =>
{
var task = QueueReadTask( ts, number );
return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
}
)
.ToArray();
Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
Task.WaitAll( tasks );
Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}
Естьобратите внимание на пару вещей:
Перемещение task.Wait()
в QueueReadTask
делает более очевидным, что вы блокируете задачу.Вы можете удалить вызов FromAsync
и продолжение и заменить их обычным синхронным вызовом, поскольку вы все равно блокируете.
Задание, возвращаемое из QueueReadTask
, может иметь продолжения.По умолчанию они выполняются в планировщике по умолчанию, поскольку они наследуют планировщик родительской задачи, а не предшествующий.В этом случае нет родительской задачи, поэтому используется планировщик по умолчанию.