Вместо перезаписи Merge для завершения, когда завершается любой поток, я бы предложил преобразовать события onCompleted в события onNext и использовать var ss = s1.Merge(s2).TakeUntil(s1ors2complete)
, где s1ors2complete выдает значение, когда заканчивается s1 или s2.Вы также можете просто связать .TakeUntil(s1completes).TakeUntil(s2completes)
вместо создания s1ors2complete.Этот подход обеспечивает лучшую композицию, чем расширение MergeWithCompleteOnEither, поскольку его можно использовать для преобразования любого оператора «завершено при завершении» в оператор «завершено при любом завершении».
Как преобразовать события onNext в события onCompletedЕсть несколько способов сделать это.Метод CompositeDisposable звучит как хороший подход, и после небольшого поиска эта интересная тема о конвертируется между onNext, onError и onCompleted уведомлениями .Вероятно, я бы создал метод расширения ReturnTrueOnCompleted, используя xs.SkipWhile(_ => true).concat(Observable.Return(True))
, и тогда ваше слияние станет:
var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));
Вы также можете посмотреть на использование такого оператора, как Zip, который автоматически завершает , когда одиниз входных потоков завершается.