C # Threading для обработки очереди сообщений - PullRequest
0 голосов
/ 27 июня 2018

У меня есть следующие требования -

  1. Поток, который получает сообщения и помещает их в очередь.
  2. Поток, который обрабатывает помещенные в очередь сообщения.

Теперь второй поток всегда должен быть живым - для этого я использовал бесконечный цикл while следующим образом:

private AutoResetEvent messageReset;
private Queue<byte[]> messageQueue;

//thread 2 method
private void ProcessIncomingMessages()
{
 messageReset.WaitOne(); //wait for signal
 while(true)
 {
    if (messageQueue.Count > 0)
    {
        //processing messages
    }
  }
}
public void SubmitMessageForProcessing(byte[] message){
            messageQueue.Enqueue(message); //enqueue message

            // Release the thread
            messageReset.Set();
}

Теперь, этот бесконечный цикл while очень сильно загружает процессор. Есть ли обходной путь для снижения загрузки процессора

ПРИМЕЧАНИЕ. Я не могу добавить оператор thread.sleep, поскольку входящие сообщения должны отображаться в пользовательском интерфейсе с минимальной задержкой.

Ответы [ 2 ]

0 голосов
/ 27 июня 2018

Просто используйте BlockingCollection вместо Queue. Это потокобезопасно и будет блокироваться на Take, пока какой-нибудь рабочий не добавит элемент:

// Use default constructor to make BlockingCollection FIFO
private BlockingCollection<byte[]> messageQueue = new BlockingCollection<byte[]>();

//thread 2 method
private void ProcessIncomingMessages()
{
    while (true)
    {
        //will block until thread1 Adds a message
        byte[] message = messageQueue.Take();

        //processing messages
    }
}
public void SubmitMessageForProcessing(byte[] message)
{
    messageQueue.Add(message); //enqueue message
}

EDIT2: я забыл упомянуть, что при использовании конструктора по умолчанию BlockingCollection будет FIFO. На самом деле он будет использовать ConcurrentQueue в качестве контейнера предметов.

Если вы хотите, чтобы BlockingCollection вел себя как коллекция LIFO, вам нужно было бы передать IProducerConsumerCollection, который является LIFO, в конструктор. Обычный класс для этого будет ConcurrentStack


РЕДАКТИРОВАТЬ: Некоторое объяснение того, как ваш Queue не является потокобезопасным, и это может привести к проблемам с вашим текущим кодом.

Из документации Microsoft по Queue:

Очередь может одновременно поддерживать несколько читателей, если коллекция не изменена.

Это означает, что вы не можете читать и писать из нескольких потоков одновременно.

Посмотрите на следующий пример, который также относится к другим ответам, которые предлагают просто переместить messageReset.WaitOne() в ваш блок while(true).

  1. SubmitMessageForProcessing вызывается и сигнализирует messageReset.Set()
  2. Поток 2 становится активным и пытается прочитать данные.
  3. Пока поток 2 читает данные, SubmitMessageForProcessing вызывается второй раз.
  4. Теперь вы пишете и читаете одновременно, что приводит к неожиданному поведению (как правило, к исключениям)
0 голосов
/ 27 июня 2018

В вашем примере цикл while будет занят, пока в очереди не будет хотя бы одного элемента. Вы можете переместить сигнал в этот цикл, чтобы уменьшить время ожидания и использовать меньше ресурсов процессора.

private void ProcessIncomingMessages()
{
    while(true)
    {
        messageReset.WaitOne(100); //wait for signal
        while (messageQueue.Count > 0)
        {
            //processing messages
        }
    }
}

P.S. Если у вас нет какого-то специального механизма блокировки, вы должны использовать ConcurrentQueue<T> вместо Queue<T>, если хотите быть поточно-ориентированным. Кроме того, я установил тайм-аут для вызова WaitOne, поскольку существует небольшая вероятность того, что сигнал будет установлен после проверки счетчика, но до того, как будет достигнут вызов WaitOne. В вашем решении могут быть другие проблемы с многопоточностью. Если вы не уверены в проблемах с многопоточностью, вы можете использовать BlockingCollection , который позаботится о многих деталях для вас.

...