Асинхронный. читать из блоков NetworkStream в течение 60 секунд. (Rx) - PullRequest
0 голосов
/ 25 ноября 2011

Когда в потоке нет данных, и я пытаюсь прочитать блоки потока в течение 60 секунд. Когда есть некоторые данные, чтение завершается по желанию. Как я могу переписать следующий код, чтобы он мог читать только когда stream.DataAvailable имеет значение true?

Мне кажется, мне нужно что-то вроде Observable.While (dataAvailableObserver, AsyncRead) ..

    public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
    {
        return Observable.Create<byte[]>(
            o => Observable.Defer(() => AsyncReadChunk(stream, bufferSize))
                     .Repeat()
                     .Subscribe(dataChunk =>
                                    {
                                        if (dataChunk.Length > 0)
                                        {
                                            o.OnNext(dataChunk);

                                            return;
                                        }

                                        Debug.Assert(!stream.DataAvailable);

                                        o.OnCompleted();
                                    }, o.OnError, o.OnCompleted));
    }

    public static IObservable<byte[]> AsyncReadChunk(this NetworkStream stream, int bufferSize)
    {
        var buffer = new byte[bufferSize];

        return Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead)(buffer, 0, bufferSize)
            .Select(cbRead =>
            {
                Console.WriteLine("Data chunk received.");

                var dataChunk = new byte[cbRead];

                Buffer.BlockCopy(buffer, 0, dataChunk, 0, cbRead);

                return dataChunk;
            });
    }

Я обнаружил, что нужно читать в небольших буферных размерах, так как большие буферы вызывают ожидание заполнения буфера (как в моем сценарии, когда входящие данные представляют собой небольшие пакеты).

Ответы [ 2 ]

0 голосов
/ 14 февраля 2012

Я немного не уверен в том, поможет ли это, но это метод ToObservable (), который я использую для чтения потоков.

public static class ObservableApmExtensions
{
    public static IObservable<byte> ToObservable(this FileStream source)
    {
        return source.ToObservable(4096, Scheduler.CurrentThread);
    }

    public static IObservable<byte> ToObservable(this FileStream source, int buffersize, IScheduler scheduler)
    {
        return Observable.Create<byte>(o =>
        {
            var initialState = new StreamReaderState(source, buffersize);
            var subscription = new MultipleAssignmentDisposable();
            Action<StreamReaderState, Action<StreamReaderState>> action =
                (state, self) =>
                {
                    subscription.Disposable = state.ReadNext()
                        .Subscribe(
                            bytesRead =>
                            {
                                for (int i = 0; i < bytesRead; i++)
                                {
                                    o.OnNext(state.Buffer[i]);
                                }
                                if (bytesRead > 0)
                                    self(state);
                                else
                                    o.OnCompleted();
                            },
                            o.OnError);
                };

            var scheduledAction = scheduler.Schedule(initialState, action);
            return new CompositeDisposable(scheduledAction, subscription);
        });
    }

    private sealed class StreamReaderState
    {
        private readonly int _bufferSize;
        private readonly Func<byte[], int, int, IObservable<int>> _factory;

        public StreamReaderState(Stream source, int bufferSize)
        {
            _bufferSize = bufferSize;
            _factory = Observable.FromAsyncPattern<byte[], int, int, int>(source.BeginRead, source.EndRead);
            Buffer = new byte[bufferSize];
        }

        public IObservable<int> ReadNext()
        {
            return _factory(Buffer, 0, _bufferSize);
        }

        public byte[] Buffer { get; set; }
    }
}

Я не пробовал это с NetworkStream, но похоже, что вы можете поменять чек

if (bytesRead > 0)

до

if (source.DataAvailable)

Затем вам также необходимо изменить тип источника на NetworkStream.

Я думаю, что планирование в моем коде может помочь вам с вашими проблемами блокировки. Другой вариант, если уместно (я все еще не совсем понимаю вашу проблему), вы могли бы использовать .Включите и создайте вложенную наблюдаемую.

Это будет означать, что когда некоторые данные поступают через вас, вы читаете их все до тех пор, пока они не будут сделаны, а затем завершите. Когда вы закончите, вы начнете другую последовательность, которая будет иметь дальнейшие данные.

s1 --1-0-1-1|
s2          ---1-0-0-1-|        
s3                     ---0-0-1-0-1|
etc..
out--1-0-1-1---1-0-0-1----0-0-1-0-1|

s1, s2, s3 и т. Д. - это последовательности, представляющие собой пакет данных до потока. DataAvailable. Затем эти внутренние потоки будут завершены, и начнется запрос (создание другой внутренней наблюдаемой последовательности s2, s3, sN). Все коммутаторы (или Merge или Concat) смогут объединить эти несколько последовательностей в одну для пользователей.

Еще одна альтернатива, которая может быть проще в написании кода, - это IEnumerable >. Их легко создать с помощью такого метода, как этот

public IEnumerable<IObservable<byte>> ConstantRead(string path)
{
    while (true)
    {
        yield return Observable.Using(
                () => new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.None),
                stream => stream.ToObservable(4096, Scheduler.ThreadPool));
    }
}

Изменение требований сетевого потока.

Затем вы просто расправляетесь

_subscription = ConstantRead(@"C:\Users\Lee\MyFile.zip")
                .Concat()
                .Subscribe(...

Надеюсь, это поможет.

Ли Кэмпбелл

0 голосов
/ 25 ноября 2011

Так как вы используете Defer, вы должны проверить доступные данные в вашей логике defer.Самый простой способ - выполнить проверку в методе AsyncReadChunk, например:

public static IObservable<byte[]> AsyncReadChunk(this NetworkStream stream, int bufferSize) 
{ 
    if (!stream.DataAvailable)
    {
        return Observable.Empty<byte[]>();
    }
    else
    {
        var buffer = new byte[bufferSize]; 

        return Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead)(buffer, 0, bufferSize) 
            .Select(cbRead => 
...