Я немного не уверен в том, поможет ли это, но это метод ToObservable (), который я использую для чтения потоков.
public static class ObservableApmExtensions
{
public static IObservable<byte> ToObservable(this FileStream source)
{
return source.ToObservable(4096, Scheduler.CurrentThread);
}
public static IObservable<byte> ToObservable(this FileStream source, int buffersize, IScheduler scheduler)
{
return Observable.Create<byte>(o =>
{
var initialState = new StreamReaderState(source, buffersize);
var subscription = new MultipleAssignmentDisposable();
Action<StreamReaderState, Action<StreamReaderState>> action =
(state, self) =>
{
subscription.Disposable = state.ReadNext()
.Subscribe(
bytesRead =>
{
for (int i = 0; i < bytesRead; i++)
{
o.OnNext(state.Buffer[i]);
}
if (bytesRead > 0)
self(state);
else
o.OnCompleted();
},
o.OnError);
};
var scheduledAction = scheduler.Schedule(initialState, action);
return new CompositeDisposable(scheduledAction, subscription);
});
}
private sealed class StreamReaderState
{
private readonly int _bufferSize;
private readonly Func<byte[], int, int, IObservable<int>> _factory;
public StreamReaderState(Stream source, int bufferSize)
{
_bufferSize = bufferSize;
_factory = Observable.FromAsyncPattern<byte[], int, int, int>(source.BeginRead, source.EndRead);
Buffer = new byte[bufferSize];
}
public IObservable<int> ReadNext()
{
return _factory(Buffer, 0, _bufferSize);
}
public byte[] Buffer { get; set; }
}
}
Я не пробовал это с NetworkStream, но похоже, что вы можете поменять чек
if (bytesRead > 0)
до
if (source.DataAvailable)
Затем вам также необходимо изменить тип источника на NetworkStream.
Я думаю, что планирование в моем коде может помочь вам с вашими проблемами блокировки. Другой вариант, если уместно (я все еще не совсем понимаю вашу проблему), вы могли бы использовать
.Включите и создайте вложенную наблюдаемую.
Это будет означать, что когда некоторые данные поступают через вас, вы читаете их все до тех пор, пока они не будут сделаны, а затем завершите. Когда вы закончите, вы начнете другую последовательность, которая будет иметь дальнейшие данные.
s1 --1-0-1-1|
s2 ---1-0-0-1-|
s3 ---0-0-1-0-1|
etc..
out--1-0-1-1---1-0-0-1----0-0-1-0-1|
s1, s2, s3 и т. Д. - это последовательности, представляющие собой пакет данных до потока. DataAvailable. Затем эти внутренние потоки будут завершены, и начнется запрос (создание другой внутренней наблюдаемой последовательности s2, s3, sN). Все коммутаторы (или Merge или Concat) смогут объединить эти несколько последовательностей в одну для пользователей.
Еще одна альтернатива, которая может быть проще в написании кода, - это IEnumerable >. Их легко создать с помощью такого метода, как этот
public IEnumerable<IObservable<byte>> ConstantRead(string path)
{
while (true)
{
yield return Observable.Using(
() => new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.None),
stream => stream.ToObservable(4096, Scheduler.ThreadPool));
}
}
Изменение требований сетевого потока.
Затем вы просто расправляетесь
_subscription = ConstantRead(@"C:\Users\Lee\MyFile.zip")
.Concat()
.Subscribe(...
Надеюсь, это поможет.
Ли Кэмпбелл