Перебирайте список в потоке постоянно, так как он получает новые элементы - PullRequest
4 голосов
/ 16 апреля 2009

Я действительно не уверен, как подойти к этому, но я подписываюсь на события, запускаемые в пользовательском классе, и в идеале я хочу поставить их в очередь и обрабатывать их в порядке поступления, когда они поступают. Мне известно о Queue<T> и я думаю, что я должен использовать это? но мой вопрос в обработчике событий, когда мое сообщение получено, могу ли я просто Enqueue() в очередь там, и если да, то как можно обойти очередь при добавлении новых элементов?

Я думал о вызове метода в конструкторе, который выполняет что-то вроде ( держись за ваши шляпы ):

while (true)
{
  foreach (var item in myQueue)
  {
    // process
    myQueue.Dequeue();
  }
}

Конечно, должен быть более элегантный способ сделать это? Это должно эффективно поразить myQueue и выполнять итерации, поскольку оно содержит элементы и делать то, что я хочу. Каким будет представление? Я могу порождать это в отдельном потоке, чтобы избежать какой-либо блокировки потока, у меня просто есть время принять while (true)!

Ответы [ 8 ]

4 голосов
/ 16 апреля 2009

Вы не можете сделать это. Перечислитель, возвращенный для foreach, выдаст исключение, если базовая коллекция будет изменена, пока перечислитель все еще используется.

По сути, вам нужно настроить другой поток для обработки событий. В идеале вы должны сообщить этому другому потоку (через событие или другой механизм синхронизации), что работа доступна. Это приведет к удалению из очереди рабочего элемента, его обработке, а затем переходит в режим ожидания до получения следующего сигнала. В качестве альтернативы можно использовать опрос (периодически просыпаться и проверять наличие другого элемента), но это будет менее эффективно. В любом случае вам нужно использовать блокировку, чтобы убедиться, что вы не пытаетесь изменить очередь в обоих потоках одновременно.

4 голосов
/ 16 апреля 2009

Это классическая проблема производителя / потребителя. Быстрый веб-поиск показывает http://msdn.microsoft.com/en-us/library/yy12yx1f(VS.80,loband).aspx,, который точно охватывает это.

Вы не хотите выполнять цикл while (true), поскольку ваш поток будет сжигать 100% ЦП, даже если для этого нет работы, что потенциально может привести к истощению других потоков в системе.

2 голосов
/ 16 апреля 2009

Если вы используете это из нескольких потоков, вам нужно будет ввести некоторую форму блокировки, чтобы избежать проблем с синхронизацией вашей очереди. Вы, вероятно, также должны убедиться, что ваша очередь заблокирована при постановке в очередь элементов, а также исключение из очереди элементов.

Если поток, обрабатывающий вашу очередь, не будет ничего делать, кроме этого, ваш базовый код, вероятно, в порядке, за исключением обработки случая, когда очередь в настоящее время пуста. Вы, вероятно, должны добавить что-то вроде:

while (myQueue.Count == 0)
    Thread.Sleep(sleepTime);

Это даст вам возможность "ждать", пока ваши события не заполнят вашу очередь.

Кроме того, когда вы выходите из очереди, вы не сможете использовать foreach. Вы хотите сделать что-то вроде:

while (myQueue.Count == 0)
    Thread.Sleep(sleepTime);
while (myQueue.Count > 0)
{
    lock(myLock)
        myObject = myQueue.Dequeue();
    // do your work...
}

Это предотвратит проблемы с изменением коллекции, если кто-то добавит в вашу очередь, и избавит вас от необходимости блокировать все время обработки ваших элементов.


Редактировать: Я согласен с некоторыми комментариями, что это не всегда самая эффективная стратегия.

Если очередь будет в значительной степени пустой и в некоторых случаях будет иметь элементы, я создам собственный класс, который обернет ее.

Вы можете вызвать событие, когда очередь станет пустой / непустой. Как только очередь получает элементы, событие может инициировать поток для его обработки и очистки. Как только он достигнет 0 элементов, он может иметь событие, чтобы остановить обработку.

Это было бы намного эффективнее, чем постоянное ожидание, если в большинстве случаев состояние очереди пустое. С другой стороны, если очередь заполнена почти постоянно, а поток обработки будет редко поддерживаться, для простоты я использовал бы описанный выше подход.

1 голос
/ 16 апреля 2009

Я обычно использую ManualResetEvent, чтобы сигнализировать, что элемент был добавлен в коллекцию.

Обработчик событий делает что-то вроде этого:

lock (myLock)
{
   myq.Enqueue(...);
   myqAddedSignal.Set();
}

Поток обработки ожидает сигнала - после того, как он дал сигнал, он очищает очередь, сбрасывает сигнал и обрабатывает элементы:

while (true)
{
   myqAddedSignal.WaitOne();
   lock (myLock)
   {
      // pull everything off myQ into a list
      // clear myQ
      myqAddedSignal.Reset();
   }

   foreach (obj in copyOfMyQ)
   {
      ...
   }
}

Это обработает элементы в очереди потокобезопасным способом. Единственное общее состояние - myqAddedSignal - доступ к нему синхронизируется с myLock (я обычно просто делаю это объектом).

0 голосов
/ 16 апреля 2009

Структура, которую вы ищете - семафор.

Вы добавляете элемент в очередь, затем добавляете счет в семафор.

Вы ожидаете семафор в другом потоке и обрабатываете элемент из очереди.

В зависимости от реализации очереди (например, BCL) вам придется блокировать / разблокировать при получении.

0 голосов
/ 16 апреля 2009

Следуя совету Рида, я создал собственный класс и генерировал события, когда очередь была пуста и заполнена.

Пользовательский EventQueue<T> Класс:

public class EventQueue<T> : Queue<T>
{
    public delegate void OnQueueMadeEmptyDelegate();
    public event OnQueueMadeEmptyDelegate OnQueueMadeEmpty;
    public delegate void OnQueueMadeNonEmptyDelegate();
    public event OnQueueMadeNonEmptyDelegate OnQueueMadeNonEmpty;

    public new void Enqueue(T item)
    {
        var oldCount = Count;
        base.Enqueue(item);
        if (OnQueueMadeNonEmpty != null &&
            oldCount == 0 && Count > 0)
            // FIRE EVENT
            OnQueueMadeNonEmpty();
    }
    public new T Dequeue()
    {
        var oldCount = Count;
        var item = base.Dequeue();
        if (OnQueueMadeEmpty != null &&
            oldCount > 0 && Count == 0)
        {
            // FIRE EVENT
            OnQueueMadeEmpty();
        }
        return item;
    }
    public new void Clear()
    {
        base.Clear();
        if (OnQueueMadeEmpty != null)
        {
            // FIRE EVENT
            OnQueueMadeEmpty();
        }
    }
}

(я удалил

для меньшего примера кода. Я использую модификатор "new" как способ добавить дополнительную логику к базовой логике).

Рядовые в основном классе:

public delegate void InitQueueDelegate();
private InitQueueDelegate initQueueDelegate;

private EventQueue<QueueRequest> translationQueue;
private Object queueLock = new Object();

В конструкторе основного класса:

initQueueDelegate = this.InitQueue;
initQueueDelegate.BeginInvoke(null, null);

В основном корпусе:

private void InitQueue()
{
    this.translationQueue = new EventQueue<QueueRequest>();
    this.translationQueue.OnQueueMadeEmpty += new EventQueue<QueueRequest>.OnQueueMadeEmptyDelegate(translationQueue_OnQueueMadeEmpty);
    this.translationQueue.OnQueueMadeNonEmpty += new EventQueue<QueueRequest>.OnQueueMadeNonEmptyDelegate(translationQueue_OnQueueMadeNonEmpty);
}

void translationQueue_OnQueueMadeNonEmpty()
{
    while (translationQueue.Count() > 0)
    {
        lock (queueLock)
        {
            QueueRequest request = translationQueue.Dequeue();
#if DEBUG
            System.Diagnostics.Debug.WriteLine("Item taken from queue...");
#endif
            // hard work
            ....
            ....
            ....
        }
    }
}

void translationQueue_OnQueueMadeEmpty()
{
    // empty queue
    // don't actually need to do anything here?
}

private void onMessageReceived(....)
{
  ....
  ....
  ....
  // QUEUE REQUEST
  lock (queueLock)
  {
    QueueRequest queueRequest = new QueueRequest
                                    {
                                        Request = request,
                                        Sender = sender,
                                        Recipient = tcpClientService
                                    };
    translationQueue.Enqueue(queueRequest);
#if DEBUG
    System.Diagnostics.Debug.WriteLine("Item added to queue...");
#endif
  }
}

И, наконец, структура QueueRequest:

public struct QueueRequest
{
    public MessageTranslateRequest Request { get; set; }
    public TCPClientService Sender { get; set; }
    public TCPClientService Recipient { get; set; }
}

Я знаю, что там много всего, но хотел, чтобы вы, ребята, отметили полную реализацию. Как вы думаете? Я правильно выполнил блокировку?

Я награжу Рида кредитом, если все в порядке, поскольку мое решение было создано из его идей.

0 голосов
/ 16 апреля 2009

Вы могли видеть этот существующий ответ , у которого есть такая очередь.

0 голосов
/ 16 апреля 2009

То, что ты там делаешь, выглядит неправильно. Если вы действительно используете очередь, то вам действительно нужно вытягивать элементы из очереди, а не перебирать ее:

while (!queue.empty()) // or whatever
{
  process the first item in the queue
}
...