Есть неоднозначный оператор Amb
, который гоняет две последовательности, чтобы увидеть, кто победит первым.
Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
Поток победителей продолжает распространяться до конца - мы этого не хотим. Мы заинтересованы в том, чтобы снова начать гонку за следующей ценностью. Давайте сделаем это:
Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
.Take(1)
.Repeat()
Теперь последняя проблема заключается в том, что DistinctUntilChanged
теряет свое состояние каждый раз, когда мы перезапускаем гонку, и его поведение равно pu sh самому первому значению, которое он получает немедленно. Давайте исправим это, превратив его в горячую наблюдаемую.
logSubject.DistinctUntilChanged().Publish();
Собираем все вместе:
var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged().Publish(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(5)); // log at least every 5 seconds
var oneof =
Observable
.Amb(logMessagesChangedStream, logMessagesSampleStream)
.Take(1)
.Repeat();
logMessagesChangedStream.Connect();
oneof.Timestamp().Subscribe(c => Console.WriteLine(c));
Изменил 10 секунд на 5, , потому что .
Тест
Action<string> onNext = logSubject.OnNext;
onNext("a");
onNext("b");
Delay(1000, () => { onNext("c"); onNext("c"); onNext("c"); onNext("d"); });
Delay(3000, () => { onNext("d"); onNext("e"); });
Delay(6000, () => { onNext("e"); });
Delay(10000, () => { onNext("e"); });
Выход
a@1/30/2020 12:17:52 PM +00:00
b@1/30/2020 12:17:52 PM +00:00
c@1/30/2020 12:17:53 PM +00:00
d@1/30/2020 12:17:53 PM +00:00
e@1/30/2020 12:17:55 PM +00:00
e@1/30/2020 12:18:00 PM +00:00
e@1/30/2020 12:18:05 PM +00:00