Если вы готовы ограничить ваш параллелизм скоростью самого медленного потока, то это довольно просто и не требует дополнительного буфера, я записал его из памяти, поэтому, возможно, немного испортил его.
Кроме того, он не реализует все необходимые переопределения, такие вещи, как CanRead, CanSeek и т. П., Все пропущены для простоты, только основная параллельная запись выполняется.
using System;
using System.IO;
using System.Threading;
namespace MultiCastStream
{
public class MultiWriteStream : Stream
{
private readonly Stream[] streams;
private AutoResetEvent[] waits;
private readonly IAsyncResult[] results;
public MultiWriteStream(params Stream[] streams)
{
this.streams = (Stream[])streams.Clone();
this.results = new IAsyncResult[this.streams.Length];
}
private void prepWaits()
{
if (waits == null)
{
this.waits = new AutoResetEvent[this.streams.Length];
for(int i= 0; i < this.waits.Length; i++)
{
this.waits[i] = new AutoResetEvent(false);
}
}
else
{
for(int i= 0; i < this.waits.Length; i++)
{
this.waits[i].Reset();
}
}
}
public override void Write (byte[] buffer, int offset, int count)
{
prepWaits();
for( int i = 0; i < this.streams.Length; i++)
{
this.results[i] = streams[i].BeginWrite(
buffer, offset, count, this.Release, waits[i]);
}
WaitHandle.WaitAll(waits);
for( int i = 0; i < this.streams.Length; i++)
{
this.results[i] = this.streams[i].EndWrite(results[i]);
}
}
private void Release(IAsyncResult result)
{
((AutoResetEvent)result.AsyncState).Set();
}
public override void WriteByte (byte value)
{
// no point doing this asynchronously
foreach (Stream s in this.streams)
s.WriteByte (value);
}
protected override void Dispose (bool disposing)
{
base.Dispose (disposing);
if (this.waits != null)
{
foreach (AutoResetEvent w in this.waits)
w.Close();
}
foreach (Stream s in this.streams)
s.Dispose();
}
}
}