Как скопировать поток во многие потоки async C # .NET - PullRequest
3 голосов
/ 08 ноября 2011

У меня есть TCP-сервер, который принимает большие данные без перерыва. И мне нужно транслировать этот поток многим клиентам.

UPDATE: Мне нужно транслировать видео поток. Возможно, есть готовые решения?

Ответы [ 3 ]

2 голосов
/ 08 ноября 2011

Если вы хотите сделать это асинхронно, тогда вы можете воспользоваться System.Threading.Tasks пространством имен .

Сначала вам понадобится карта Stream экземпляров с Task, которую можно ждать для завершения:

IDictionary<Stream, Task> streamToTaskMap = outputStreams.
    ToDictionary(s => s, Task.Factory.StartNew(() => { });

В вышеприведенном тексте есть небольшие издержки, в том смысле, что есть потраченный впустую экземпляр Task, который ничего не делает, но эта цена мала, учитывая количество Task экземпляров и продолжений, которые вам нужно выполнить.

Оттуда вы читаете содержимое из потока и затем асинхронно записываете его в каждый из Stream:

byte[] buffer = new byte[<buffer size>];
int read = 0;

while ((read = inputStream.Read(buffer, 0, buffer.Length)) > 0)
{
    // The buffer to copy into.
    byte[] copy = new byte[read];

    // Perform the copy.
    Array.Copy(buffer, copy, read);

    // Cycle through the map, and replace the task with a continuation
    // on the task.
    foreach (Stream stream in streamToTaskMap.Keys)
    {
        // Continue.
        streaToTaskMap[stream] = streaToTaskMap[stream].ContinueWith(t => {
            // Write the bytes from the copy.
            stream.Write(copy, 0, copy.Length);
        });
    }
}

И, наконец, вы можете дождаться всех записанных потоков, позвонив по номеру:

Task.WaitAll(streamToTaskMap.Values.ToArray());

Есть несколько вещей, на которые стоит обратить внимание.

Во-первых, требуется копия buffer из-за лямбды, которая передается в ContinueWith; лямбда - это замыкание, которое инкапсулирует buffer, и поскольку оно обрабатывается асинхронно, содержимое может измениться. Каждому продолжению нужна своя копия буфера для чтения.

Также для вызова Stream.Write используется свойство Array.Length; в противном случае переменная read должна была бы копироваться через каждую итерацию цикла.

Кроме того, было бы более целесообразно использовать методы BeginWrite / EndWrite в классе Stream; поскольку не существует метода ContinueWithAsync, который будет принимать Task и продолжать работу с асинхронным методом, нет смысла вызывать асинхронные версии read.

Это один из тех случаев, когда может быть лучше вызвать BeginWrite / EndWrite самостоятельно (а также BeginRead / EndRead), чтобы максимально использовать асинхронных операций; конечно, это будет немного сложнее, потому что у вас не будет инкапсуляции результата операции, который предоставляет Task, и вам придется принять те же меры предосторожности с buffer, если вы используете анонимные методы / замыкания.

1 голос
/ 08 ноября 2011

Создает поток, передающий ему поток, а также потоки для записи в

byte[] buffer = new byte[BUFFER_SIZE];
int btsRead = 0;

while ((btsRead = inputStream.Read(buffer, 0, BUFFER_SIZE)) > 0)
{
    foreach (Stream oStream in outputStreams)
        oStream.Write(buffer, 0, btsRead);
}

РЕДАКТИРОВАТЬ: Чтобы распараллелить записи:

замените блок foreach на:

Parallel.ForEach(outputStreams, oStream =>
{
    oStream.Write(buffer, 0, btsRead);
});
0 голосов
/ 08 ноября 2011

По сути, вы хотите создать поток для своего приложения. Вот простой пример многопоточности и TCP / IP

C # Учебное пособие - Простой многопоточный TCP-сервер | Включить код

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...