Библиотека параллельных задач WaitAny Design - PullRequest
1 голос
/ 09 июня 2010

Я только начал изучать PTL и у меня возник вопрос по дизайну.

Мой сценарий: У меня есть список URL, каждый из которых ссылается на изображение. Я хочу, чтобы каждое изображение загружалось параллельно. Как только будет загружен хотя бы один образ, я хочу выполнить метод, который делает что-то с загруженным образом. Этот метод НЕ должен быть распараллелен - он должен быть последовательным.

Я думаю, что следующее будет работать, но я не уверен, что это правильный способ сделать это. Поскольку у меня есть отдельные классы для сбора изображений и для выполнения «чего-то» с собранными изображениями, я заканчиваю тем, что обхожу массив Задач, который кажется неправильным, поскольку он раскрывает внутреннюю работу по извлечению изображений. Но я не знаю, как обойти это. На самом деле оба этих метода - это нечто большее, но для этого это не важно. Просто знайте, что их не следует объединять в один большой метод, который одновременно извлекает и делает что-то с изображением.

//From the Director class
Task<Image>[] downloadTasks = collector.RetrieveImages(listOfURLs);

for (int i = 0; i < listOfURLs.Count; i++)
{
    //Wait for any of the remaining downloads to complete
    int completedIndex = Task<Image>.WaitAny(downloadTasks);
    Image completedImage = downloadTasks[completedIndex].Result;

    //Now do something with the image (this "something" must happen serially)
    //Uses the "Formatter" class to accomplish this let's say
}

///////////////////////////////////////////////////

//From the Collector class
public Task<Image>[] RetrieveImages(List<string> urls)
{
    Task<Image>[] tasks = new Task<Image>[urls.Count];

    int index = 0;
    foreach (string url in urls)
    {
        string lambdaVar = url;  //Required... Bleh
        tasks[index] = Task<Image>.Factory.StartNew(() =>
            {
                using (WebClient client = new WebClient())
                {
                    //TODO: Replace with live image locations
                    string fileName = String.Format("{0}.png", i);
                    client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName));
                }

                return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
            },
            TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent);

        index++;
    }

    return tasks;
}

Ответы [ 4 ]

9 голосов
/ 26 июня 2010

Как правило, вы используете WaitAny для ожидания одной задачи, если вам не нужны результаты других.Например, если вы только что позаботились о первом возвращаемом изображении.

Как об этом вместо этого.

Это создает две задачи, одна из которых загружает изображения и добавляет их в коллекцию блокирующих.Вторая задача ожидает коллекции и обрабатывает любые изображения, добавленные в очередь.Когда все изображения загружены, первая задача закрывает очередь, поэтому вторая задача может быть закрыта.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Net;
using System.Threading.Tasks;

namespace ClassLibrary1
{
    public class Class1
    {
        readonly string _path = Directory.GetCurrentDirectory();

        public void Demo()
        {
            IList<string> listOfUrls = new List<string>();
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");

            BlockingCollection<Image> images = new BlockingCollection<Image>();

            Parallel.Invoke(
                () =>                   // Task 1: load the images
                {
                    Parallel.For(0, listOfUrls.Count, (i) =>
                        {
                            Image img = RetrieveImages(listOfUrls[i], i);
                            img.Tag = i;
                            images.Add(img);    // Add each image to the queue
                        });
                    images.CompleteAdding();    // Done with images.
                },
                () =>                   // Task 2: Process images serially
                {
                    foreach (var img in images.GetConsumingEnumerable())
                    {
                        string newPath = Path.Combine(_path, String.Format("{0}_rot.png", img.Tag));
                        Console.WriteLine("Rotating image {0}", img.Tag);
                        img.RotateFlip(RotateFlipType.RotateNoneFlipXY);

                        img.Save(newPath);
                    }
                });
        }

        public Image RetrieveImages(string url, int i)
        {
            using (WebClient client = new WebClient())
            {
                string fileName = Path.Combine(_path, String.Format("{0}.png", i));
                Console.WriteLine("Downloading {0}...", url);
                client.DownloadFile(url, Path.Combine(_path, fileName));
                Console.WriteLine("Saving {0} as {1}.", url, fileName);
                return Image.FromFile(Path.Combine(_path, fileName));
            }
        } 
    }
}

ПРЕДУПРЕЖДЕНИЕ. В коде нет проверки или отмены ошибок.Уже поздно и тебе нужно что-то делать правильно?:)

Это пример шаблона конвейера.Предполагается, что получение изображения происходит довольно медленно и что затраты на блокировку внутри коллекции блокировок не вызовут проблем, поскольку это происходит относительно редко по сравнению со временем, потраченным на загрузку изображений.

Наша книга ..Подробнее об этом и других шаблонах параллельного программирования вы можете прочитать в http://parallelpatterns.codeplex.com/ Глава 7 посвящена конвейерам, а в прилагаемых примерах показаны конвейеры с обработкой и отменой ошибок.

2 голосов
/ 09 июня 2010

TPL уже предоставляет функцию ContinueWith для выполнения одной задачи по завершении другой.Цепочка задач является одним из основных шаблонов, используемых в TPL для асинхронных операций.

Следующий метод загружает набор изображений и продолжает переименование каждого из файлов

static void DownloadInParallel(string[] urls)
{
   var tempFolder = Path.GetTempPath();

   var downloads = from url in urls
                   select Task.Factory.StartNew<string>(() =>{
                       using (var client = new WebClient())
                       {
                           var uri = new Uri(url);
                           string file = Path.Combine(tempFolder,uri.Segments.Last());
                           client.DownloadFile(uri, file);
                           return file;
                       }
                   },TaskCreationOptions.LongRunning|TaskCreationOptions.AttachedToParent)
                  .ContinueWith(t=>{
                       var filePath = t.Result;
                       File.Move(filePath, filePath + ".test");
                  },TaskContinuationOptions.ExecuteSynchronously);

    var results = downloads.ToArray();
    Task.WaitAll(results);
}

Также следует проверить асинхронные задачи WebClient из примеров ParallelExtensionsExtras.Методы расширения DownloadXXXTask обрабатывают как создание задач, так и асинхронную загрузку файлов.

Следующий метод использует расширение DownloadDataTask для получения данных образа и его поворота перед сохранением на диск

static void DownloadInParallel2(string[] urls)
{
    var tempFolder = Path.GetTempPath();

    var downloads = from url in urls
         let uri=new Uri(url)
         let filePath=Path.Combine(tempFolder,uri.Segments.Last())
         select new WebClient().DownloadDataTask(uri)                                                        
         .ContinueWith(t=>{
            var img = Image.FromStream(new MemoryStream(t.Result));
            img.RotateFlip(RotateFlipType.RotateNoneFlipY);
            img.Save(filePath);
         },TaskContinuationOptions.ExecuteSynchronously);

    var results = downloads.ToArray();
    Task.WaitAll(results);
}
0 голосов
/ 13 мая 2013

// скачать все изображения

private async void GetAllImages ()
{
    var downloadTasks = listOfURLs.Where(url =>   !string.IsNullOrEmpty(url)).Select(async url =>
            {
                var ret = await RetrieveImage(url);
                return ret;
        }).ToArray();

        var counts = await Task.WhenAll(downloadTasks);
}

//From the Collector class
public async Task<Image> RetrieveImage(string url)
{
    var lambdaVar = url;  //Required... Bleh
    using (WebClient client = new WebClient())
    {
        //TODO: Replace with live image locations
        var fileName = String.Format("{0}.png", i);
        await client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName));
    }
    return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
}  
0 голосов
/ 09 июня 2010

Лучшим способом сделать это, вероятно, будет реализация шаблона Observer: пусть ваша функция RetreiveImages реализует IObservable , поместите ваше "действие завершенного изображения" в объект IObserver OnNext и подпишитесь на RetreiveImages.

Я еще не пробовал это сам (все еще нужно больше играть с библиотекой задач), но я думаю, что это "правильный" способ сделать это.

...