Rx Buffer работает один раз и завершен - PullRequest
0 голосов
/ 27 января 2020

Мне нравится собирать первые значения потока в течение 3 секунд и подключать их к другому потоку того же типа данных. Первый Observable - это отдельный законченный блок данных, который должен завершиться в любом случае, чтобы я мог подключиться первым и вторым с помощью Concat (). Использование Concat () необходимо для сохранения целостности потока данных.

            private IObservable<3DPoints> _1stBuffer = Observable.Empty<3DPoints>();
            ..

           _1stBuffer = someRawStreamObservableReceivingOnNext
                .Buffer(TimeSpan.FromSeconds(3), 100)
                .Where(item => item.Any())
                .SelectMany(item => item);


// later 
            var streamObservable = _1stBuffer.Concat(_some2ndStream); // doesn't emit, since 1stBuffer doesn't complete

Я пробовал это:

            _1stBuffer = someRawStreamObservableReceivingOnNext

                // completes the observable, but i want the buffer so far, no emptiness !
                .TimeOut(TimeSpan.FromSeconds(3), Observable.Empty<3DPoints>)

                .Buffer(TimeSpan.FromSeconds(3), 100)
                .Where(item => item.Any())
                .SelectMany(item => item);

Тайм-аут делает bufferObservable выполняя Concat (), но продолжая Observable. Пустые <3DPoints> не вариант. Как я могу получить 1-й буфер, заполненный за 3 секунды, и объединить их?

1 Ответ

1 голос
/ 28 января 2020

Возможно, вы ищете один из вариантов Take:

  • source.Take(1) получит первый элемент из источника, затем завершит.
  • source.TakeUntil(otherObservable) будет извлекаться из источника до тех пор, пока не появится первый элемент из других наблюдаемых.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...