Сценарий, над которым я работаю, состоит из 3 Observables.
StartObs : этот Observable испускается, когда мне нужно запустить последовательность обработок - испускаемые данные представляют собой processID процесса, который мне нужно выполнить
DoStuffObs : этот Observable испускает команд , на которых ядолжен что-то сделать - я хочу начать слушать такое Observable сразу после того, как StartObs испустил, и мне нужно processID процесса, чтобы выполнить свои обязанности сfunction doTheWork(command, processId)
EndObs : эта наблюдаемая функция выдается, когда мне нужно закончить обработку определенного processID и вернуться кпрослушайте следующую эмиссию StartObs
Так что в основном это: Start , DoStuff до End и затем возврат назаддля прослушивания следующего Start .
Также гарантируется, что после одного Начало наступает раньше или позже Конец и что невозможно иметь 2 Начало без Конца между или 2 Конца без Начало в между.
Первые 2 шага могут быть достигнуты через switchMap
, например
StartObs
.switchMap(processId => DoStuff.map(command => ({command, processId})))
.tap(data => doTheWork(data.command, data.processId))
Что мне не ясно, так этокак бороться с EndObs .
Опция использования takeUntil
не работает, так как я не хочу завершать цепочку, начатую с StartObs , так как мне нужно вернуться к прослушиванию для запуска следующего процесса.