Асинхронный администратор очередей - PullRequest
2 голосов
/ 13 марта 2012

Я столкнулся с проблемой при написании асинхронного многосерверного сетевого приложения на c #.У меня есть много заданий, выполняемых пулом потоков, в том числе записи в сетевые сокеты.Это позволило учесть случай, когда более чем один поток мог одновременно писать в сокет и сбивать мои исходящие сообщения.Моя идея обойти это состояла в том, чтобы реализовать систему очередей, в которой всякий раз, когда данные добавлялись в очередь, сокет записывал их.

Моя проблема в том, что я не могу полностью обернуться вокруг архитектуры чего-либоэтой природы.Я представляю себе объект очереди, который запускает событие всякий раз, когда данные добавляются в очередь.Затем событие записывает данные, хранящиеся в очереди, но это не сработает, потому что, если два потока приходят и добавляют в очередь одновременно, даже если очередь создана для обеспечения безопасности потока, события все равно будут запускаться для обоих иЯ столкнусь с той же проблемой.Так что, может быть, каким-то образом отложить событие, если другое происходит, но как мне продолжить это событие, когда первое завершится, без простой блокировки потока на каком-нибудь мьютексе или чем-то еще.Это не было бы так сложно, если бы я не пытался оставаться строгим с моей архитектурой «ничего не блокировать», но это конкретное приложение требует, чтобы я позволял потокам пула потоков продолжать делать свое дело.

Есть идеи?

Ответы [ 4 ]

4 голосов
/ 13 марта 2012

Хотя, как и в случае ответа Porges, реализация немного отличается.

Во-первых, я обычно не ставлю байты в очередь на отправку, но объекты и сортирую их в потоке отправки, но я думаю, что это дело вкуса. Но большая разница заключается в использовании ConcurrentQueues (в дополнение к BlockingCollection). Так что я бы получил код, похожий на

        BlockingCollection<Packet> sendQueue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
        while (true)
        {
            var packet = sendQueue.Take(); //this blocks if there are no items in the queue.
            SendPacket(packet); //Send your packet here.
        }

Ключ к выводу здесь заключается в том, что у вас есть один поток, который зацикливает этот код, и все остальные потоки могут добавлять в очередь потокобезопасным способом (и BlockingCollection, и ConcurrentQueue являются потокобезопасными)

взгляните на Асинхронная обработка очереди элементов в C # , где я ответил на аналогичный вопрос.

4 голосов
/ 13 марта 2012

Похоже, вам нужен синхронный поток для одного потока в сокет и несколько потоков для записи в очередь для обработки этого потока.

Вы можете использовать блокирующую коллекцию (BlockingCollection<T>) для выполнения тяжелой работы:

// somewhere there is a queue:

BlockingCollection<byte[]> queue = new BlockingCollection<byte[]>();

// in socket-writing thread, read from the queue and send the messages:

foreach (byte[] message in queue.GetConsumingEnumerable())
{
    // just an example... obviously you'd need error handling and stuff here
    socket.Send(message);
}

// in the other threads, just enqueue messages to be sent:

queue.Add(someMessage);

BlockingCollection будет обрабатывать всю синхронизацию. Вы также можете установить максимальную длину очереди и другие забавные вещи.

0 голосов
/ 13 марта 2012

У вас может быть потокобезопасная очередь, в которую все ваши рабочие потоки записывают свои результаты.Затем создайте другой поток, который опрашивает очередь и отправляет результаты, когда видит, что они ожидают.

0 голосов
/ 13 марта 2012

Я не знаю C #, но я бы сделал так, чтобы событие вызывало диспетчер сокетов, чтобы он начал вытаскивать из очереди и записывать вещи по одному.Если он уже работает, триггер не будет ничего делать, и если в очереди ничего нет, он останавливается.

Это решает проблему одновременной записи двух потоков в очередь, поскольку второе событие будетнет оп.

...