Мне нравится собирать первые значения потока в течение 3 секунд и подключать их к другому потоку того же типа данных. Первый Observable - это отдельный законченный блок данных, который должен завершиться в любом случае, чтобы я мог подключиться первым и вторым с помощью Concat (). Использование Concat () необходимо для сохранения целостности потока данных.
private IObservable<3DPoints> _1stBuffer = Observable.Empty<3DPoints>();
..
_1stBuffer = someRawStreamObservableReceivingOnNext
.Buffer(TimeSpan.FromSeconds(3), 100)
.Where(item => item.Any())
.SelectMany(item => item);
// later
var streamObservable = _1stBuffer.Concat(_some2ndStream); // doesn't emit, since 1stBuffer doesn't complete
Я пробовал это:
_1stBuffer = someRawStreamObservableReceivingOnNext
// completes the observable, but i want the buffer so far, no emptiness !
.TimeOut(TimeSpan.FromSeconds(3), Observable.Empty<3DPoints>)
.Buffer(TimeSpan.FromSeconds(3), 100)
.Where(item => item.Any())
.SelectMany(item => item);
Тайм-аут делает bufferObservable выполняя Concat (), но продолжая Observable. Пустые <3DPoints> не вариант. Как я могу получить 1-й буфер, заполненный за 3 секунды, и объединить их?