Parallel.ForEach L oop с Retry Logi c in C# - PullRequest
0 голосов
/ 06 апреля 2020

Я использую Parallel.ForEach для загрузки нескольких файлов в C# из Google Bucket в папку. Я использую logry retry c, чтобы он мог повторить загрузку файлов в случае сбоя загрузки файлов во время загрузки. Как я могу применить повторные логи c для каждого файла или каждого потока в Parallel.ForEach l oop.

Parallel.ForEach(listFiles, objectName =>            
{
    retryCount = 0;                        
    countOfFiles++;
    downloadSuccess = false;
    bucketFileName = Path.GetFileName(objectName.Name);
    guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

    while (retryCount < retryCountInput && downloadSuccess == false)
    {
        try
        {
            FileStream fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write);
            using (fs)
            {                                               
                storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress).Wait();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception occured while downloading file: " + ex.ToString());                   
            Thread.Sleep(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
            retryCount++;

        }
    }
}

Ответы [ 2 ]

1 голос
/ 06 апреля 2020

Я бы поменял его на задачи и использовал асин c. Таким образом, ваш Thread.Sleep не блокирует поток пула потоков. Parallel.ForEach предназначен для работы с процессором.

Что-то вроде: (я не могу скомпилировать / протестировать это без остальной части вашего кода)

int retryCountInput = 5;
var tasks = new List<Task>();

foreach (var file in listFiles)
{
    var task = Task.Run(async () =>
    {
        // make it local
        int retryCount = 0;
        string bucketFileName = Path.GetFileName(objectName.Name);
        string guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

        while (retryCount < retryCountInput)
        {
            try
            {
                using (var fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write))
                    // Use await here, instead of `Wait()` so this threadpool thread
                    // can be used for other tasks.
                    await storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress);

                break;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception occured while downloading file: " + ex.ToString());

                // Use Task.Delay here, so this thread is 'released'
                await Task.Delay(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
                retryCount++;
            }
        }
    });
    tasks.Add(task);
}
await Task.WhenAll(tasks);
0 голосов
/ 23 апреля 2020

Я изменил свой код и удалил Parallel.ForEach вместо этого, используя foreach l oop для перебора файлов. Но сейчас я не могу найти все файлы по пути загрузки, хотя в журналах показываются все файлы, которые были загружены. Количество загруженных файлов в пути загрузки изменяется, и это поведение кажется случайным. Могу ли я использовать Task.Run для операций ввода-вывода?

var tasks = new List<Task>();
foreach (var objectName in listFiles)
{
    var task = Task.Run(() =>
    {
        downloadSuccess = false;
        bucketFileName = Path.GetFileName(objectName.Name);
        guidFolderPath = tempFolderLocation + "\\" + bucketFileName;

        var maxRetryAttempts = 3;
        var pauseBetweenFailures = TimeSpan.FromSeconds(2);
        RetryHelper.RetryOnException(maxRetryAttempts, pauseBetweenFailures, async () =>
        {
            FileStream fs = new FileStream(guidFolderPath, FileMode.Create,
                FileAccess.Write, FileShare.Write);
            using (fs)
            {
                var progress = new Progress<IDownloadProgress>(
                    p =>
                    {
                        DownloadProgress(p, retryCount, objectName.Name);
                    });

                await client.DownloadObjectAsync(bucketName, objectName.Name,
                    fs, option, cancellationToken.Token, progress);
            }
        });
    });
    tasks.Add(task);
}
await Task.WhenAll(tasks);
...