Вам было бы намного лучше, если бы вы сосредоточились на использовании aysnc-версий API-хранилищ больших двоичных объектов и / или Stream
API-интерфейсов, чтобы вы были связаны с вводом-выводом, а не с процессором. Везде, где есть API BeginXXX, вы должны использовать его, заключив его в Task.Factory.FromAsync
и используя продолжение оттуда. В вашем конкретном случае вы должны использовать кредит CloudBlob.BeginUploadFromStream
. То, как вы получаете поток на начальном этапе, так же важно, так что ищите и асинхронные API на этом конце.
Единственное, что может удержать вас от использования небольшого экземпляра после этого, это то, что он ограничен скоростью 100 Мбит / с, а средняя скорость - 200 Мбит / с. Опять же, вы всегда можете использовать коэффициент эластичности и увеличить количество ролей, когда вам нужно больше обработки, и снова уменьшить, когда все успокоится.
Вот пример того, как вы бы назвали BeginUploadFromStream
, используя FromAsync
. Теперь, что касается координации параллельной обработки, поскольку теперь вы запускаете асинхронные задачи, вы не можете рассчитывать на Parallel :: ForEach, чтобы ограничить максимальный параллелизм для вас. Это означает, что у вас будет обычный foreach в исходном потоке с Semaphore
для ограничения параллелизма. Это обеспечит эквивалент MaxDegreeOfParallelism
:
// Setup a semaphore to constrain the max # of concurrent "thing"s we will process
int maxConcurrency = ... read from config ...
Semaphore maxConcurrentThingsToProcess = new Semaphore(maxConcurrency, maxConcurrency);
// Current thread will enumerate and dispatch I/O work async, this will be the only CPU resource we're holding during the async I/O
foreach(Thing thing in myThings)
{
// Make sure we haven't reached max concurrency yet
maxConcurrentThingsToProcess.WaitOne();
try
{
Stream mySourceStream = ... get the source stream from somewhere ...;
CloudBlob myCloudBlob = ... get the blob from somewhere ...;
// Begin uploading the stream asynchronously
Task uploadStreamTask = Task.Factory.FromAsync(
myCloudBlob.BeginUploadFromStream,
myCloudBlob.EndUploadFromStream,
mySourceStream,
null);
// Setup a continuation that will fire when the upload completes (regardless of success or failure)
uploadStreamTask.ContinueWith(uploadStreamAntecedent =>
{
try
{
// upload completed here, do any cleanup/post processing
}
finally
{
// Release the semaphore so the next thing can be processed
maxConcurrentThingsToProcess.Release();
}
});
}
catch
{
// Something went wrong starting to process this "thing", release the semaphore
maxConcurrentThingsToProcess.Release();
throw;
}
}
Теперь в этом примере я не показываю, как вы также должны получать исходный поток асинхронно, но если, например, вы загружаете этот поток с URL-адреса в другом месте, вы также захотите запустить его асинхронно и здесь цепочка начала асинхронной загрузки в продолжение.
Поверьте, я знаю, что это больше, чем просто выполнение простого Parallel::ForEach
, но существует Parallel::ForEach
, чтобы упростить параллелизм для задач, связанных с процессором. Когда дело доходит до ввода-вывода, использование асинхронных API-интерфейсов является единственным способом достижения максимальной пропускной способности ввода-вывода при минимизации ресурсов ЦП.