Есть ли лучший способ ждать очереди? - PullRequest
22 голосов
/ 26 июня 2009

Есть ли лучший способ ожидания потоков в очереди, прежде чем выполнять другой процесс?

В настоящее время я делаю:

this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable

// Initiate process

foreach (string someString in arrayStrings)
{
     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     Thread.Sleep(100);
}

// Waiting execution for all queued threads
lock (this.workerLocker)  // Global variable (object)
{
     while (this.RunningWorkers > 0)
     {
          Monitor.Wait(this.workerLocker);
     }
}

// Do anything else    
Console.WriteLine("END");

// Method DoSomething() definition
public void DoSomething(object data)
{
    // Do a slow process...
    .
    .
    .

    lock (this.workerLocker)
    {
        this.RunningWorkers--;
        Monitor.Pulse(this.workerLocker);
    }
}

Ответы [ 8 ]

14 голосов
/ 26 июня 2009

Возможно, вы захотите взглянуть на AutoResetEvent и ManualResetEvent.

Они предназначены именно для этой ситуации (ожидание завершения потока ThreadPool перед выполнением «чего-либо»).

Вы бы сделали что-то вроде этого:

static void Main(string[] args)
{
    List<ManualResetEvent> resetEvents = new List<ManualResetEvent>();
    foreach (var x in Enumerable.Range(1, WORKER_COUNT))
    {
        ManualResetEvent resetEvent = new ManualResetEvent();
        ThreadPool.QueueUserWorkItem(DoSomething, resetEvent);
        resetEvents.Add(resetEvent);
    }

    // wait for all ManualResetEvents
    WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-)
}

public static void DoSomething(object data)
{
    ManualResetEvent resetEvent = data as ManualResetEvent;

    // Do something

    resetEvent.Set();
}

Редактировать: Забыл упомянуть, что вы можете ждать одну ветку, любую ветку и так далее. Также в зависимости от вашей ситуации AutoResetEvent может немного упростить ситуацию, поскольку он (как следует из названия) может автоматически сигнализировать о событиях: -)

13 голосов
/ 30 июня 2009

Как насчет Fork и Join, которые используют просто Monitor; -p

Forker p = new Forker();
foreach (var obj in collection)
{
    var tmp = obj;
    p.Fork(delegate { DoSomeWork(tmp); });
}
p.Join();

Полный код показан на этом предыдущем ответе .

Или для очереди производителя / потребителя с ограниченным размером (поточно-ориентированной и т. Д.), здесь .

4 голосов
/ 03 июля 2009

В дополнение к Барьеру, указанному Хенком Холтерманом (кстати, его использование очень неправильного использования Барьера, см. Мой комментарий к его ответу), .NET 4.0 предоставляет целый ряд других опций (для использования их в .NET 3.5 необходимо загрузить дополнительную DLL от Microsoft ). Я написал в блоге сообщение , в котором перечислены все , но мой любимый, безусловно, Parallel.ForEach:

Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

За кулисами Parallel.ForEach ставит в очередь новый и улучшенный пул потоков и ожидает завершения всех потоков.

3 голосов
/ 02 июля 2009

Да, есть.

Предлагаемый подход

1) счетчик и ручка ожидания

int ActiveCount = 1; // 1 (!) is important
EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual);

2) добавление цикла

foreach (string someString in arrayStrings)
{
     Interlocked.Increment(ref ActiveCount);

     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     // Thread.Sleep(100); // you really need this sleep ?
}

PostActionCheck();
ewhAllDone.Wait();

3) DoSomething должно выглядеть как

{
    try
    {
        // some long executing code
    }
    finally
    {
        // ....
        PostActionCheck();
    }
} 

4) где PostActionCheck равно

void PostActionCheck()
{
    if (Interlocked.Decrement(ref ActiveCount) == 0)
        ewhAllDone.Set();
}

Идея

ActiveCount инициализируется с помощью 1, а затем увеличивается * в 1022 * раз.

PostActionCheck вызывается n + 1 раз. Последний вызовет событие.

Преимущество этого решения состоит в том, что он использует один объект ядра (который является событием) и 2 * n + 1 вызовы легких API. (Может быть меньше?)

приписка

Я написал здесь код, возможно, я неправильно написал некоторые имена классов.

3 голосов
/ 29 июня 2009

Мне действительно нравится Begin-End-Async Pattern , когда мне приходится ждать завершения задач.

Я бы посоветовал вам обернуть BeginEnd в рабочий класс:

public class StringWorker
{
    private string m_someString;
    private IAsyncResult m_result;

    private Action DoSomethingDelegate;

    public StringWorker(string someString)
    {
        DoSomethingDelegate = DoSomething;
    }

    private void DoSomething()
    {
        throw new NotImplementedException();
    }

    public IAsyncResult BeginDoSomething()
    {
        if (m_result != null) { throw new InvalidOperationException(); }
        m_result = DoSomethingDelegate.BeginInvoke(null, null);
        return m_result;
    }

    public void EndDoSomething()
    {
        DoSomethingDelegate.EndInvoke(m_result);
    }
}

Для начала и работы используйте фрагмент кода:

List<StringWorker> workers = new List<StringWorker>();

foreach (var someString in arrayStrings)
{
    StringWorker worker = new StringWorker(someString);
    worker.BeginDoSomething();
    workers.Add(worker);
}

foreach (var worker in workers)
{
    worker.EndDoSomething();
}

Console.WriteLine("END");

И это все.

Sidenote: Если вы хотите получить результат из BeginEnd, измените «Действие» на Func и измените EndDoSomething, чтобы он возвращал тип.

public class StringWorker
{
    private string m_someString;
    private IAsyncResult m_result;

    private Func<string> DoSomethingDelegate;

    public StringWorker(string someString)
    {
        DoSomethingDelegate = DoSomething;
    }

    private string DoSomething()
    {
        throw new NotImplementedException();
    }

    public IAsyncResult BeginDoSomething()
    {
        if (m_result != null) { throw new InvalidOperationException(); }
        m_result = DoSomethingDelegate.BeginInvoke(null, null);
        return m_result;
    }

    public string EndDoSomething()
    {
        return DoSomethingDelegate.EndInvoke(m_result);
    }
}
2 голосов
/ 26 июня 2009

.NET 4.0 имеет новый класс для этого, Барьер .

Кроме того, ваш метод не так уж и плох, и вы могли бы немного оптимизировать его, используя только Pulsing, если значение RunningWorkers равно 0 после уменьшения. Это будет выглядеть так:

this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable

foreach (string someString in arrayStrings)
{
     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     //Thread.Sleep(100);
}

// Waiting execution for all queued threads
Monitor.Wait(this.workerLocker);

// Method DoSomething() definition
public void DoSomething(object data)
{
    // Do a slow process...
    // ...

    lock (this.workerLocker)
    {
        this.RunningWorkers--;
        if (this.RunningWorkers == 0)
           Monitor.Pulse(this.workerLocker);
    }
}

Вы можете использовать EventWaithandle или AutoResetEvent, но все они являются оболочками Win32 API. Поскольку класс Monitor является чисто управляемым кодом, я бы предпочел Monitor для этой ситуации.

0 голосов
/ 03 июля 2009

Использовать пружинную резьбу. Он имеет встроенные реализации Barrier.

http://www.springsource.org/extensions/se-threading-net

0 голосов
/ 26 июня 2009

Я не уверен, что это действительно так, недавно я сделал что-то похожее для сканирования каждого IP подсети на предмет принятия определенного порта.

Несколько вещей, которые я могу предложить, которые могут улучшить производительность:

  • Используйте метод SetMaxThreads ThreadPool, чтобы настроить производительность (т. Е. Сбалансировать одновременное выполнение большого количества потоков и время блокировки на таком большом количестве потоков).

  • Не спите при настройке всех рабочих элементов, в этом нет особой необходимости (о чем я сразу знаю. Однако спите внутри метода DoSomething, возможно, только на миллисекунду, чтобы позволить другим потокам прыгайте туда, если это будет необходимо.

Я уверен, что вы могли бы реализовать более специализированный метод самостоятельно, но я сомневаюсь, что он будет более эффективным, чем использование ThreadPool.

P.S Я не на 100% ясен о причине использования монитора, так как вы все равно блокируете? Обратите внимание, что вопрос задается только потому, что я ранее не использовал класс Monitor, а не потому, что я на самом деле сомневаюсь в его использовании.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...