У меня есть Observable<EntityId[]>
, и я могу для каждого EntityId получить Observable<Entity>
.Я хочу превратить его в Observable<Entity[]>
.Следующее решение решает это простым способом:
idsStream
.Select(ids => ids
.Select(id => StreamEntity(id))
.CombineLatest())
.Switch()
Проблема в том, что каждый раз, когда idsStream испускает несколько новых идентификаторов, каждая подписка сущности будет закрыта, а новые будут открыты.Я только хочу закрыть Observables, которые больше не содержатся в EntityId[]
, и открыть новые потоки для добавленных идентификаторов.
Мне кажется, для этого должен быть оператор, но я ничего не могу найти.Кто-нибудь знает такого оператора, или я должен составить / реализовать свой собственный.Если это так, следующее кажется идеей
// Only select for new elements and reuses former values
SelectAndKeep: Observable<TIn[]> -> (TIn -> TOut) -> Observable<TOut[]>
// Only subscribes/unsubscribe to new/removed observables
SwitchKeepAndCombineLatest: Observable<Observable<T>[]> -> Observable<T[]>
Observable<EntityId[]> idsStream
Observable<Observable<Entity>[]> entitiesStreamStream =
idsStream.SelectAndKeep(id => StreamEntity(id))
Observable<Entity[]> entitiesStream = entitiesStreamStream
.SwitchKeepAndCombineLatest()
Есть какие-нибудь мысли или опыт по этому сценарию?Это кажется довольно общим, и поэтому хорошее решение должно существовать ...