Каков наилучший способ, чтобы несколько потоков работали и ожидали завершения всех из них? - PullRequest
13 голосов
/ 16 декабря 2009

Я пишу простое приложение (для моей жены не менее :-P), которое выполняет некоторые манипуляции с изображениями (изменение размера, отметки времени и т. Д.) Для потенциально большой серии изображений. Поэтому я пишу библиотеку, которая может делать это как синхронно, так и асинхронно. Я решил использовать Асинхронный шаблон на основе событий . При использовании этого шаблона вам необходимо вызвать событие, когда работа будет завершена. Здесь я испытываю проблемы, зная, когда это будет сделано. Итак, в основном, в моем методе DownsizeAsync (асинхронный метод для уменьшения размера изображений) я делаю что-то вроде этого:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }

Самая сложная часть - знать, когда все они завершены.

Вот что я рассмотрел: Использование ManualResetEvents, как здесь: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx Но проблема, с которой я столкнулся, состоит в том, что вы можете ждать только 64 или менее событий. У меня может быть намного больше изображений.

Второй вариант: иметь счетчик, который подсчитывает выполненные изображения, и поднять событие, когда счет достигнет общей суммы:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;

Теперь это кажется "хакерским", и я не совсем уверен, что это потокобезопасно.

Итак, мой вопрос: каков наилучший способ сделать это? Есть ли другой способ синхронизации всех потоков? Я не должен использовать ThreadPool? Спасибо !!

ОБНОВЛЕНИЕ Основываясь на отзывах в комментариях и на нескольких ответах, я решил использовать этот подход:

Во-первых, я создал метод расширения, который объединяет перечислимое в «пакеты»:

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }

В основном, если вы делаете что-то вроде этого:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }

Вы получите этот вывод:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95

Идея в том, что (как кто-то в комментариях указал) нет необходимости создавать отдельную ветку для каждого изображения. Поэтому я буду пакетировать изображения в число [machine.cores * 2] пакетов. Затем я воспользуюсь вторым подходом, который заключается в том, чтобы просто поддерживать счетчик, и когда счетчик достигнет ожидаемого результата, я буду знать, что все сделано.

Причина, по которой я теперь убежден, что это на самом деле потокобезопасность, заключается в том, что я пометил общую переменную как volatile, которое согласно MSDN :

Обычно используется летучий модификатор для поля, к которому обращаются несколько потоков без использования оператор блокировки для сериализации доступа. Использование модификатора volatile обеспечивает что один поток извлекает больше всего актуальная стоимость, написанная другим нить

означает, что я должен быть в открытом состоянии (если нет, пожалуйста, дайте мне знать !!)

Итак, вот код, с которым я собираюсь:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }

Я открыт для обратной связи, так как я никоим образом не являюсь экспертом по многопоточности, поэтому, если кто-то видит какие-либо проблемы с этим или у него есть идея получше, пожалуйста, дайте мне знать. (Да, это просто домашнее приложение, но у меня есть некоторые идеи о том, как я могу использовать полученные знания для улучшения нашей службы поиска / индексации, которую мы используем на работе.) Пока я буду держать этот вопрос открытым до чувствую, что я использую правильный подход. Спасибо всем за помощь.

Ответы [ 8 ]

11 голосов
/ 16 декабря 2009

Вы все еще хотите использовать ThreadPool, потому что он будет управлять количеством потоков, которые он запускает одновременно. Недавно я столкнулся с подобной проблемой и решил ее так:

var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);

foreach (var image in images)
{
    dispatcher.Add(new ResizeJob(image));
}

dispatcher.WaitForJobsToFinish();

IDispatcher и IJob выглядят так:

public interface IJob
{
    void Execute();
}

public class ThreadPoolDispatcher : IDispatcher
{
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();

    public void Dispatch(IJob job)
    {
        var resetEvent = CreateAndTrackResetEvent();
        var worker = new ThreadPoolWorker(job, resetEvent);
        ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
    }

    private ManualResetEvent CreateAndTrackResetEvent()
    {
        var resetEvent = new ManualResetEvent(false);
        resetEvents.Add(resetEvent);
        return resetEvent;
    }

    public void WaitForJobsToFinish()
    {
        WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
        resetEvents.Clear();
    }
}

А затем использовал декоратор для разделения использования ThreadPool:

public class ChunkingDispatcher : IDispatcher
{
    private IDispatcher dispatcher;
    private int numberOfJobsDispatched;
    private int chunkSize;

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
    {
        this.dispatcher = dispatcher;
        this.chunkSize = chunkSize;
    }

    public void Dispatch(IJob job)
    {
        dispatcher.Dispatch(job);

        if (++numberOfJobsDispatched % chunkSize == 0)
            WaitForJobsToFinish();
    }

    public void WaitForJobsToFinish()
    {
        dispatcher.WaitForJobsToFinish();
    }
}

Абстракция IDispatcher работает очень хорошо для замены вашей техники потоков. У меня есть другая реализация, которая является SingleThreadedDispatcher, и вы можете создать версию ThreadStart, как предложил Джон Скит. Тогда легко запустить каждый из них и посмотреть, какую производительность вы получите. SingleThreadedDispatcher хорош при отладке вашего кода или когда вы не хотите уничтожать процессор на вашем компьютере.

Редактировать: Я забыл добавить код для ThreadPoolWorker:

public class ThreadPoolWorker
{
    private IJob job;
    private ManualResetEvent doneEvent;

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
    {
        this.job = job;
        this.doneEvent = doneEvent;
    }

    public void ThreadPoolCallback(object state)
    {
        try
        {
            job.Execute();
        }
        finally
        {
            doneEvent.Set();
        }
    }
}
11 голосов
/ 16 декабря 2009

Самое простое - создать новые потоки, а затем вызвать Thread.Join для каждого из них. Вы могли бы использовать семафор или что-то в этом роде, но, вероятно, проще просто создавать новые темы.

В .NET 4.0 вы можете использовать Parallel Extensions, чтобы сделать это довольно легко с задачами.

В качестве другой альтернативы, в которой будет использовать пул потоков, вы можете создать делегат и вызвать на нем BeginInvoke, чтобы получить IAsyncResult - затем вы можете получить WaitHandle для каждого результата через свойство AsyncWaitHandle и вызов WaitHandle.WaitAll.

РЕДАКТИРОВАТЬ: Как указано в комментариях, вы можете вызывать WaitAll только с 64 дескрипторами одновременно в некоторых реализациях. Альтернативой может быть вызов WaitOne для каждого из них по очереди или вызов WaitAll с партиями. Это не будет иметь большого значения, если вы делаете это из потока, который не собирается блокировать пул потоков. Также обратите внимание, что вы не можете звонить WaitAll из потока STA.

5 голосов
/ 16 декабря 2009

Самое простое и эффективное решение - использовать счетчики и сделать их безопасными для потоков. Это потребует меньше памяти и может масштабироваться до большего числа потоков

Вот образец

int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
    Interlocked.Increment(ref itemCount);

    ThreadPool.QueueUserWorkItem(x=>{
        try
        {
            //code logic here.. sleep is just for demo
            Thread.Sleep(100);
        }
        finally
        {
            Interlocked.Decrement(ref itemCount);
        }
    });
}

while (itemCount > 0)
{
    Console.WriteLine("Waiting for " + itemCount + " threads...");
    Thread.Sleep(100);
}
Console.WriteLine("All Done!");
2 голосов
/ 16 декабря 2009

Я использую метод статической утилиты для проверки всех отдельных дескрипторов ожидания.

    public static void WaitAll(WaitHandle[] handles)
    {
        if (handles == null)
            throw new ArgumentNullException("handles",
                "WaitHandle[] handles was null");
        foreach (WaitHandle wh in handles) wh.WaitOne();
    }

Затем в моем основном потоке я создаю Список этих дескрипторов ожидания, и для каждого делегата, который я помещаю в свою очередь ThreadPool, я добавляю дескриптор ожидания в Список ...

 List<WaitHandle> waitHndls = new List<WaitHandle>();
 foreach (iterator logic )
 {
      ManualResetEvent txEvnt = new ManualResetEvent(false);

      ThreadPool.QueueUserWorkItem(
           delegate
               {
                   try { // Code to process each task... }
                   // Finally, set each wait handle when done
                   finally { lock (locker) txEvnt.Set(); } 
               });
      waitHndls.Add(txEvnt);  // Add wait handle to List
 }
 util.WaitAll(waitHndls.ToArray());   // Check all wait Handles in List
2 голосов
/ 16 декабря 2009

Я использовал SmartThreadPool с большим успехом, чтобы справиться с этой проблемой. Существует также Codeplex сайт о сборке.

SmartThreadPool может помочь с другими проблемами, так как некоторые потоки не могут работать одновременно, в то время как другие могут.

2 голосов
/ 16 декабря 2009

.Net 4.0 делает многопоточность еще проще (хотя вы все еще можете снимать себя с побочными эффектами).

1 голос
/ 17 декабря 2009

Я предлагаю поместить нетронутые изображения в очередь и, когда вы читаете из очереди, запускаете поток и вставляете его свойство System.Threading.Thread.ManagedThreadId в словарь вместе с именем файла. Таким образом, ваш пользовательский интерфейс может отображать как ожидающие, так и активные файлы.

Когда каждый поток завершает свою работу, он вызывает процедуру обратного вызова, возвращая свой ManagedThreadId. Этот обратный вызов (переданный в качестве делегата потоку) удаляет идентификатор потока из словаря, запускает другой поток из очереди и обновляет пользовательский интерфейс.

Когда очередь и словарь пусты, все готово.

Немного сложнее, но таким образом вы получаете отзывчивый пользовательский интерфейс, вы можете легко контролировать количество активных потоков и видеть, что происходит. Собирать статистику. Придумайте WPF и выставляйте индикаторы выполнения для каждого файла. Она не может не быть впечатлена.

1 голос
/ 16 декабря 2009

Другой вариант - использовать трубу.

Вы публикуете всю работу, которую необходимо выполнить, в канал, а затем читаете данные из канала из каждого потока. Когда труба пуста, все готово, потоки заканчиваются, и все довольны (конечно, убедитесь, что вы сначала производите всю работу, а затем потребляете ее)

...