Использование Parallel.Foreach в небольшом лазурном экземпляре - PullRequest
2 голосов
/ 08 декабря 2011

У меня WebRole работает на маленьком экземпляре. В этой WebRole есть метод, который загружает большое количество файлов в хранилище BLOB. Согласно спецификациям экземпляров Azure, маленький экземпляр имеет только 1 ядро ​​. Итак, будет ли Parallel.Foreach при загрузке этих блобов давать мне какие-либо преимущества по сравнению с обычным Foreach?

Ответы [ 3 ]

5 голосов
/ 08 декабря 2011

Вам было бы намного лучше, если бы вы сосредоточились на использовании aysnc-версий API-хранилищ больших двоичных объектов и / или Stream API-интерфейсов, чтобы вы были связаны с вводом-выводом, а не с процессором. Везде, где есть API BeginXXX, вы должны использовать его, заключив его в Task.Factory.FromAsync и используя продолжение оттуда. В вашем конкретном случае вы должны использовать кредит CloudBlob.BeginUploadFromStream. То, как вы получаете поток на начальном этапе, так же важно, так что ищите и асинхронные API на этом конце.

Единственное, что может удержать вас от использования небольшого экземпляра после этого, это то, что он ограничен скоростью 100 Мбит / с, а средняя скорость - 200 Мбит / с. Опять же, вы всегда можете использовать коэффициент эластичности и увеличить количество ролей, когда вам нужно больше обработки, и снова уменьшить, когда все успокоится.

Вот пример того, как вы бы назвали BeginUploadFromStream, используя FromAsync. Теперь, что касается координации параллельной обработки, поскольку теперь вы запускаете асинхронные задачи, вы не можете рассчитывать на Parallel :: ForEach, чтобы ограничить максимальный параллелизм для вас. Это означает, что у вас будет обычный foreach в исходном потоке с Semaphore для ограничения параллелизма. Это обеспечит эквивалент MaxDegreeOfParallelism:

// Setup a semaphore to constrain the max # of concurrent "thing"s we will process
int maxConcurrency = ... read from config ...
Semaphore maxConcurrentThingsToProcess = new Semaphore(maxConcurrency, maxConcurrency);

// Current thread will enumerate and dispatch I/O work async, this will be the only CPU resource we're holding during the async I/O
foreach(Thing thing in myThings)
{
    // Make sure we haven't reached max concurrency yet
    maxConcurrentThingsToProcess.WaitOne();

    try
    {
        Stream mySourceStream = ... get the source stream from somewhere ...;
        CloudBlob myCloudBlob = ... get the blob from somewhere ...;

        // Begin uploading the stream asynchronously
        Task uploadStreamTask = Task.Factory.FromAsync(
            myCloudBlob.BeginUploadFromStream,
            myCloudBlob.EndUploadFromStream,
            mySourceStream,
            null);

        // Setup a continuation that will fire when the upload completes (regardless of success or failure)
        uploadStreamTask.ContinueWith(uploadStreamAntecedent =>
        {
            try
            {
                // upload completed here, do any cleanup/post processing
            }
            finally
            {
                // Release the semaphore so the next thing can be processed
                maxConcurrentThingsToProcess.Release();
            }
        });
    }
    catch
    {
        // Something went wrong starting to process this "thing", release the semaphore
        maxConcurrentThingsToProcess.Release();

        throw;
    }
}

Теперь в этом примере я не показываю, как вы также должны получать исходный поток асинхронно, но если, например, вы загружаете этот поток с URL-адреса в другом месте, вы также захотите запустить его асинхронно и здесь цепочка начала асинхронной загрузки в продолжение.

Поверьте, я знаю, что это больше, чем просто выполнение простого Parallel::ForEach, но существует Parallel::ForEach, чтобы упростить параллелизм для задач, связанных с процессором. Когда дело доходит до ввода-вывода, использование асинхронных API-интерфейсов является единственным способом достижения максимальной пропускной способности ввода-вывода при минимизации ресурсов ЦП.

3 голосов
/ 08 декабря 2011

Да, так будет, потому что каждая ваша загрузка будет связана с сетью, поэтому планировщик может разделить между собой ваше ядро. (Это, в конце концов, то, как одноядерные и однопроцессорные компьютеры выполняют более одной операции одновременно.)

Вы также можете использовать асинхронные функции загрузки BLOB-объектов для аналогичного эффекта.

3 голосов
/ 08 декабря 2011

Количество ядер напрямую не связано с количеством потоков, порожденных Parallel.ForEach().

Около года назад Дэвид Айкен провел очень неформальный тест с некоторым доступом к таблице blob +, с ибез Parallel.ForEach(), в Маленьком экземпляре.Вы можете увидеть результаты здесь .В этом случае было измеренным улучшением, поскольку это не было связано с процессором.Я подозреваю, что вы также увидите улучшение производительности, поскольку вы загружаете большое количество объектов в хранилище BLOB-объектов.

...