Несколько потоков, ожидающих одного события? - PullRequest
5 голосов
/ 24 марта 2011

То, что (я думаю) я хочу, это эквивланта AutoResetEvent, который может ждать несколько потоков, и все это будет возобновлено, когда он установлен.

Я знаю, что этого можно достичь, имея один AutoResetEvent на поток и устанавливая каждый из них - но есть ли более простой способ? Способ, который не зависит от массивов дескрипторов событий?

Эффективно, что (я думаю) я бы хотел иметь возможность сделать это:

private volatile string state;
private MultiEventHandle stateChanged = new MultiEventHandle();

public void WaitForBlob()
{
  while (true)
  {
    object saved = stateChanged.Current;  // some sentinel value
    if (state == "Blob") break;
    stateChanged.WaitTilNot(saved);  // wait til sentinel value != "current"
  }
}

public void SetBlob()
{
  state = "Blob";
  stateChanged.Change();  // stateChanged.Current becomes a new sentinel object
}

т. Е. Любое число потоков может вызвать WaitForBlob, и в любое время (без условий гонки) SetBlob может быть вызвано еще одним потоком, и все ожидающие потоки обнаружат изменение немедленно - и, что важно, без Спин-замки или Threading.Sleeps.

Теперь я думаю, что могу относительно легко реализовать "MultiEventHandle". Но мой вопрос ... есть ли лучший способ? Конечно, я ошибаюсь, потому что это довольно распространенный случай, но я не могу найти встроенный инструмент для этой работы. Боюсь, я собираюсь изобрести здесь квадратное колесо ..

Ответы [ 3 ]

3 голосов
/ 24 марта 2011

Я обернул возможное решение в класс «WatchedVariable», используя Monitor.PulseAll / Wait за кулисами (узнавая немного о классе Monitor в процессе).Публикация здесь на случай, если кто-нибудь еще столкнется с той же проблемой - может быть полезным для неизменных структур данных.Спасибо Джону Скиту за помощь.

Использование:

private WatchedVariable<string> state;

public void WaitForBlob()
{
  string value = state.Value;
  while (value != "Blob")
  {
    value = state.WaitForChange(value);
  }
}

Реализация:

public class WatchedVariable<T>
    where T : class
{
    private volatile T value;
    private object valueLock = new object();

    public T Value
    {
        get { return value; }
        set
        {
            lock (valueLock)
            {
                this.value = value;
                Monitor.PulseAll(valueLock);  // all waiting threads will resume once we release valueLock
            }
        }
    }

    public T WaitForChange(T fromValue)
    {
        lock (valueLock)
        {
            while (true)
            {
                T nextValue = value;
                if (nextValue != fromValue) return nextValue;  // no race condition here: PulseAll can only be reached once we hit Wait()
                Monitor.Wait(valueLock);  // wait for a changed pulse
            }
        }
    }

    public WatchedVariable(T initValue)
    {
        value = initValue;
    }
}

Пока мои тестовые примеры пройдены, используйте на свой страх и риск.

Теперь проконсультируйтесь с meta, чтобы выяснить, какой ответ я должен принять ..

2 голосов
/ 24 марта 2011

Есть ли причина не использовать ManualResetEvent?Это не сбрасывает себя, когда один ожидающий поток прошел, поэтому они все будут освобождены.

Конечно, это означает, что если вам нужно Reset событие после того, как все ожидающие потоки прошлиВам понадобится какой-то способ обнаружить это.Вы могли бы возможно использовать вместо Semaphore, но я подозреваю, что это будет сложно.

Вам нужно сбросить событие сразу после его установки, в вашем случае?

1 голос
/ 13 августа 2017

Я придумал другое решение этой проблемы. Предполагается, что потоки, ожидающие события, не являются зацикленными на вызовах WaitOne(), и между вызовами ожидания есть некоторая работа. Он использует один AutoResetEvent, вызывает WaitOne(0) и Set() непрерывно, пока другие потоки не будут ожидать события.

// the only event we'll use:
AutoResetEvent are = new AutoResetEvent(false);
// starting threads:
for (int i = 0; i < 10; i++)
{
    string name = "T" + i; 
    new Thread(() => { while (true) { are.WaitOne(); WriteLine(name); } }).Start();
}

// release all threads and continue:
while (!are.WaitOne(0))
    are.Set();

Приведенный выше код протестирован на 1000 потоков, он все их освободил (хотя в цикле while слишком много лишних итераций, которые можно легко ограничить нулем, когда между вызовами ожидания в вызове немного больше работы). резьб.

Что-то, что мне не ясно из документации, это то, возможно ли для Set () выпустить WaitOne (), который был вызван позже в том же потоке - если такая ситуация возможна, это решение небезопасно поскольку он может не освободить все потоки перед выходом из цикла while. Было бы хорошо, если бы кто-то мог пролить свет на это.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...