Дождитесь завершения объединенных потоков - PullRequest
38 голосов
/ 12 февраля 2009

Извините за лишний вопрос. Тем не менее, я нашел много решений моей проблемы, но ни одно из них не очень хорошо объяснено. Я надеюсь, что это будет ясно, здесь.

Основной поток моего приложения C # порождает 1..n фоновых рабочих, использующих ThreadPool. Я хочу, чтобы оригинальная нить была заблокирована, пока все рабочие не закончили. В частности, я исследовал ManualResetEvent, но мне не ясно, как его использовать.

В псевдо:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

При необходимости я буду знать количество работников, которые должны быть поставлены в очередь, до этого.

Ответы [ 9 ]

54 голосов
/ 12 февраля 2009

Попробуй это. Функция принимает список делегатов Action. Он добавит рабочую запись ThreadPool для каждого элемента в списке. Перед возвратом он будет ждать завершения каждого действия.

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}
30 голосов
/ 12 февраля 2009

Вот другой подход - инкапсуляция; поэтому ваш код может быть таким простым:

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Где класс Forker указан ниже (мне стало скучно в поезде ;-p) ... опять же, это позволяет избежать объектов ОС, но довольно аккуратно (IMO):

using System;
using System.Threading;

/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
    public object State { get { return state; } }

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
    public Exception Exception { get { return exception; } }
}

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// <summary>Raised when all operations have completed.</summary>
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// <summary>Raised when each operation completes.</summary>
    public event EventHandler<ParallelEventArgs> ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler<ParallelEventArgs> itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// <summary>Adds a callback to invoke when each operation completes.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// <summary>Adds a callback to invoke when all operations are complete.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// <summary>Waits for all operations to complete.</summary>
    public void Join()
    {
        Join(-1);
    }

    /// <summary>Waits (with timeout) for all operations to complete.</summary>
    /// <returns>Whether all operations had completed before the timeout.</returns>
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// <summary>Indicates the number of incomplete operations.</summary>
    /// <returns>The number of incomplete operations.</returns>
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}
13 голосов
/ 12 февраля 2009

Во-первых, как долго исполняют рабочие? Пулы потоков обычно следует использовать для кратковременных задач - если они будут работать некоторое время, рассмотрите ручные потоки.

Ре проблема; вам на самом деле нужно заблокировать основной поток? Можете ли вы использовать обратный вызов вместо этого? Если так, то что-то вроде:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

Это довольно легкий (без примитивов ОС) способ отслеживания рабочих.

Если вам нужно для блокировки, вы можете сделать то же самое, используя Monitor (опять же, избегая объекта ОС):

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");
8 голосов
/ 01 октября 2009

Я использую новую библиотеку параллельных задач в CTP здесь :

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });
3 голосов
/ 17 апреля 2012

Вот решение с использованием класса CountdownEvent.

var complete = new CountdownEvent(1);
foreach (var o in collection)
{
  var capture = o;
  ThreadPool.QueueUserWorkItem((state) =>
    {
      try
      {
        DoSomething(capture);
      }
      finally
      {
        complete.Signal();
      }
    }, null);
}
complete.Signal();
complete.Wait();

Конечно, если у вас есть доступ к классу CountdownEvent, у вас есть весь TPL для работы. Класс Parallel позаботится о том, чтобы ждать вас.

Parallel.ForEach(collection, o =>
  {
    DoSomething(o);
  });
1 голос
/ 15 июля 2009

Использование .NET 4.0 Барри r класс:

        Barrier sync = new Barrier(1);

        foreach(var o in collection)
        {
            WaitCallback worker = (state) => 
            {
                // do work
                sync.SignalAndWait();
            };

            sync.AddParticipant();
            ThreadPool.QueueUserWorkItem(worker, o);
        }

        sync.SignalAndWait();
1 голос
/ 12 мая 2009

Я нашел хорошее решение здесь:

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

Может пригодиться другим с такой же проблемой

1 голос
/ 12 февраля 2009

Я думаю, что вы были на правильном пути с ManualResetEvent. Эта ссылка содержит пример кода, который точно соответствует тому, что вы пытаетесь сделать. Ключ должен использовать WaitHandle.WaitAll и передать массив событий ожидания. Каждый поток должен установить одно из этих событий ожидания.

   // Simultaneously calculate the terms.
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateBase));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateFirstTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateSecondTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateThirdTerm));

    // Wait for all of the terms to be calculated.
    WaitHandle.WaitAll(autoEvents);

    // Reset the wait handle for the next calculation.
    manualEvent.Reset();

Edit:

Убедитесь, что в пути кода рабочего потока вы задали событие (то есть autoEvents 1 .Set ();). Как только они все будут оповещены, waitAll вернется.

void CalculateSecondTerm(object stateInfo)
{
    double preCalc = randomGenerator.NextDouble();
    manualEvent.WaitOne();
    secondTerm = preCalc * baseNumber * 
        randomGenerator.NextDouble();
    autoEvents[1].Set();
}
0 голосов
/ 05 марта 2019

Попробуйте использовать CountdownEvent

// code before the threads start

CountdownEvent countdown = new CountdownEvent(collection.Length);

foreach (var o in collection)
{
    ThreadPool.QueueUserWorkItem(delegate
    {
        // do something with the worker
        Console.WriteLine("Thread Done!");
        countdown.Signal();
    });
}
countdown.Wait();

Console.WriteLine("Job Done!");

// resume the code here

Обратный отсчет будет ждать, пока все потоки не завершат выполнение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...