Обработка участников очереди - PullRequest
1 голос
/ 18 октября 2011

У меня есть поток, который заполняет очередь.И у меня есть другой поток, который обрабатывает эту очередь.Моя проблема в том, что первый поток заполняет очередь очень быстро, поэтому другой поток не может обработать эту очередь намного быстрее, и моя программа продолжает чрезмерное использование памяти.Каково оптимальное решение этой проблемы?

Извините, я забыл что-то добавить.Я не могу ограничить свою очередь или поток продюсера.Мой продюсерский поток не мог ждать, потому что он захватывает сетевые пакеты, и я не должен пропустить ни одного пакета.Я должен обрабатывать эти пакеты быстрее, чем поток производителя.

Ответы [ 5 ]

0 голосов
/ 18 октября 2011

Самое простое возможное решение состоит в том, что поток производителя проверяет, достиг ли очередь определенного предела ожидающих элементов, если это так, то переходит в режим сна, прежде чем продвигать больше работы. Другие решения зависят от того, какую реальную проблему вы пытаетесь решить, является ли обработка большего количества операций ввода-вывода или процессора и т. Д., Что даже позволит вам разработать решение, которое даже не нуждается в очереди. Например: поток производителя может генерировать, скажем, 10 элементов и вызывать другой потребительский «метод», который обрабатывает их параллельно и т. Д.

0 голосов
/ 18 октября 2011

Другой метод заключается в том, чтобы при запуске создавать новые () X межпотоковые экземпляры связи, помещать их в очередь и больше никогда не создавать. Поток A извлекает объекты из этой очереди пула, заполняет их данными и помещает их в очередь в поток B. Поток B получает объекты, обрабатывает их и затем возвращает их в очередь пула.

Это обеспечивает управление потоком - если поток A попытается отправить слишком быстро, пул высохнет, и A придется ждать в очереди пула, пока B не вернет объекты. У него есть потенциал для улучшения производительности, поскольку после начального заполнения пула нет malloc и освобождается - время блокировки в очереди push / pop будет меньше, чем при вызове диспетчера памяти. Нет необходимости в сложных ограниченных очередях - подойдет любой старый класс очереди производитель-потребитель. Пул можно использовать для обмена сообщениями между потоками в полном приложении с множеством потоков / threadPools, поэтому все они контролируются потоком. Проблемы с отключением могут быть смягчены - если очередь пула создается основным потоком при запуске перед любыми формами и т. Д. И никогда не освобождается, часто можно избежать явного отключения фоновых потоков при закрытии приложения - боль, о которой было бы неплохо просто забыть о , Утечки объектов и / или двойные выбросы легко обнаруживаются с помощью мониторинга уровня пула («обнаружено», а не «исправлено»:).

Неизбежные недостатки - вся память экземпляра межпотоковой связи постоянно выделяется, даже если приложение полностью бездействует. Объект, извлеченный из пула, будет полон «мусора» от его предыдущего использования. Если «самый медленный» поток получает объект до того, как его освободить, приложение может зайти в тупик с пустым пулом, и все объекты будут поставлены в очередь в самом медленном потоке. Очень тяжелый всплеск загрузки может привести к тому, что приложение будет «задушить» себя «рано», когда более простой механизм «new / queue / dispose» просто выделит больше экземпляров и, таким образом, будет лучше синхронизироваться с пакетом работы.

Rgds, Martin

0 голосов
/ 18 октября 2011

Вы можете определить максимальный размер очереди (скажем, 2000), который при попадании заставляет очередь принимать больше элементов только тогда, когда ее размер меньше (например, 1000).

Я бы порекомендовал использовать EventWaitHandle или ManualResetEvent, чтобы не ожидать занятости. http://msdn.microsoft.com/en-us/library/system.threading.manualresetevent.aspx

0 голосов
/ 18 октября 2011

Если вы еще этого не делаете, используйте BlockingCollection<T> в качестве своей очереди и передайте некоторое разумное ограничение для параметра boundedCapacity конструктора (которое затем отражается в свойстве BoundedCapacity) - ваш производитель заблокирует Add если это сделает очередь слишком большой и возобновит работу после того, как потребитель удалит какой-либо элемент из очереди.

Согласно документации MSDN для BlockingCollection<T>.Add:

Если при инициализации этого экземпляра BlockingCollection<T> была указана ограниченная емкость, вызов Add может блокироваться, пока не будет доступно место для хранения предоставленного элемента.

0 голосов
/ 18 октября 2011

Что ж, предполагая, что порядок обработки элементов в очереди не важен, вы можете запустить два (или более) потока, обрабатывающих очередь.

Если между ними нет какого-либо разногласия, это должно ускорить обработку. Это известно как модель для нескольких потребителей.

Другая возможность состоит в том, чтобы ваш поток производителя отслеживал размер очереди и отказывался добавлять записи, пока она не опустится ниже некоторого порогового значения. Стандартные очереди C # не обеспечивают способ остановить расширение емкости (даже использование коэффициента роста 1,0 не будет препятствовать росту).

...