Я должен сказать, что в качестве «простого примера» RxJS вы наверняка выбрали один с неуклюжей бизнес-логикой. :) (Например, другой крайний случай - это когда пользователь меняет «с» и уже изменил время, остается ли время тем же и обновляется «до»?).
Я стараюсь избегать BehaviorSubjects до тех пор, пока мне абсолютно не придется их использовать, поэтому сейчас давайте начнем с Subjects и посмотрим, работает ли это:
from = new Subject<string>();
to = new Subject<string>();
time = new Subject<string>();
from
устанавливается только с его входа (я полагаю, пользовательский интерфейс). Итак, давайте просто определим его как Observable без дополнительной обработки:
from$ = this.from.asObservable()
to
устанавливается из его входа или из потока «timeFrom» (извините, не могу найти лучшего имени):
timeFrom$ = combineLatest(this.time, this.from).pipe(
map([time, from] => time - from) // I'm leaving out formatting in this example
)
Прелестность CombineLatest в том, что он удовлетворяет вашему требованию, что мы используем timeFrom, только если значение «from» имеет значение, так как CombineLatest не излучает, если оба потока не отправили. Итак, наш to$
поток становится:
to$ = merge(
this.to,
this.timeFrom$
)
Точно так же мы можем определить время как поступающее либо из его входа, либо из потока «toFrom»:
toFrom$ = combineLatest(this.to, this.from).pipe(
map([to, from] => from - to)
)
И это приводит к:
time$ = merge(
this.time,
this.toFrom$
)
Разбивая проблему на 5 потоков вместо 3, мы теперь избегаем бесконечного цикла, о котором вы упомянули.
Надеюсь, это имело смысл. Для простоты я пропустил форматирование времени, которое у вас было.