Можно ли использовать ConcurrentQueue между двумя потоками? - PullRequest
1 голос
/ 21 июля 2010

Можно ли использовать ConcurrentQueue между двумя потоками? Я хотел проверить, мне не нужно явно где-то «блокировать». В частности, посмотрите на строки, где я в комментариях я бы бросил пакет здесь ...

public class PacketCapturer
{

private static ConcurrentQueue<Packet> _packetQueue = new ConcurrentQueue<Packet>();

public PacketCapturer(IPHostEntry proxyDns, ref BackgroundWorker bw)
{
    // start the background thread
    var backgroundThread = new System.Threading.Thread(BackgroundThread);
    backgroundThread.Start();

    // Start Packet Capture with callback via PacketCapturerCallback
}

private void PacketCapturerCallback(Packet packet)
{
    _packetQueue.Enqueue(packet);
}

private static void BackgroundThread()
{
    while (!BackgroundThreadStop)
    {
        if (_packetQueue.Count == 0) Thread.Sleep(250);
        else 
        {
            ConcurrentQueue<Packet> ourQueue;  
            ourQueue = _packetQueue;   // COULD I DROP A PACKET BETWEEN HERE
            _packetQueue = new ConcurrentQueue<Packet>();      // AND HERE???

            Console.WriteLine("BackgroundThread: ourQueue.Count is {0}", ourQueue.Count);
        }
    }
}

Ответы [ 3 ]

2 голосов
/ 21 июля 2010

Нет, не в порядке.Прежде всего, если вы измените ссылку таким образом в параллельном потоке, _packetQueue должен быть помечен как энергозависимый, чтобы оптимизации компилятора и кода не могли увидеть изменения.Вызвание _packetQueue shoudl обычно происходит как Interlocked.CompareExchange, но это менее критично для вашего использования.

Но для большего беспокойства есть шаблон изменения экземпляра packetQueue, подобный этому в фоновом потоке.Какова цель этого?У него ужасный запах кода ...

обновлено

Что я обычно делаю, это:

Поток (ы) производителя:

Producer () {
...
lock(_sharedQueue) {
  _sharedQueue.Enqueue(something);
}
...
}

потребительский поток:

consumer (...) {
...
var Something[] toProcess = null;
lock(_sharedQueue)
{
  toProcess = _sharedQueue.Toarray();
   _sharedQueue.Clear();
}
// Process here the toProcess array
...
}

Этого было достаточно для каждого отдельного использования, которое я когда-либо имел.Обработка не происходит под блокировкой, поэтому блокировка минимальна.Не нужно модного ConcurrentQueue, достаточно простой старой коллекции .Net 2.0.Обычно для хорошей практики я использую выделенный объект блокировки вместо того, чтобы блокировать фактический экземпляр очереди.

1 голос
/ 21 июля 2010

Я не думаю, что вы могли бы отбросить пакет, поскольку очевидно, что любой поток будет разыменовываться к или старому или новому. Крайний случай состоит в том, что вы получаете больше данных, поступающих в ourQueue после , по вашему мнению, вы их поменяли местами, или вы также можете не заметить изменение ссылки в цикле Sleep (но: с двумя потоками, единственный поток, который изменяет ссылку , это этот поток, поэтому не обязательно проблема).

TBH, если у вас есть 2 темы здесь, почему бы просто не lock или использовать ReaderWriterLock? Будет намного проще понять, что происходит, когда ...

1 голос
/ 21 июля 2010

Редактировать: Я думаю, что Марк / Гейб прав, что вы не потеряете ни одного пакета.Я оставлю все остальное только для справки, на случай, если кто-нибудь еще сможет взвесить это.


Просто да.Вы можете потерять один или несколько пакетов там.Возможно, вы захотите узнать, какие методы ConcurrentQueue предлагает для получения / удаления его части, поскольку это выглядит как то, что вы хотите сделать.

Почему бы не попробовать TryDequeue, пока он не вернет false:

    else 
    {
        Queue<Packet> ourQueue = new Queue<Packet>();  //this doesn't need to be concurrent unless you want it to be for some other reason.
        Packet p;
        while(_packetQueue.TryDequeue(out p))
        {
            ourQueue.Enqueue(p);
        }

        Console.WriteLine("BackgroundThread: ourQueue.Count is {0}", ourQueue.Count);
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...