Array Оператор соединения ish в Rx - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть Observable<EntityId[]>, и я могу для каждого EntityId получить Observable<Entity>.Я хочу превратить его в Observable<Entity[]>.Следующее решение решает это простым способом:

idsStream
    .Select(ids => ids
        .Select(id => StreamEntity(id))
        .CombineLatest())
    .Switch()

Проблема в том, что каждый раз, когда idsStream испускает несколько новых идентификаторов, каждая подписка сущности будет закрыта, а новые будут открыты.Я только хочу закрыть Observables, которые больше не содержатся в EntityId[], и открыть новые потоки для добавленных идентификаторов.

Мне кажется, для этого должен быть оператор, но я ничего не могу найти.Кто-нибудь знает такого оператора, или я должен составить / реализовать свой собственный.Если это так, следующее кажется идеей

// Only select for new elements and reuses former values
SelectAndKeep: Observable<TIn[]> -> (TIn -> TOut) -> Observable<TOut[]>

// Only subscribes/unsubscribe to new/removed observables
SwitchKeepAndCombineLatest: Observable<Observable<T>[]> -> Observable<T[]>

Observable<EntityId[]> idsStream

Observable<Observable<Entity>[]> entitiesStreamStream = 
    idsStream.SelectAndKeep(id => StreamEntity(id))

Observable<Entity[]> entitiesStream = entitiesStreamStream
    .SwitchKeepAndCombineLatest()

Есть какие-нибудь мысли или опыт по этому сценарию?Это кажется довольно общим, и поэтому хорошее решение должно существовать ...

1 Ответ

0 голосов
/ 08 декабря 2018

Основываясь на моем понимании вашего вопроса, я предлагаю вам эту реализацию:

interface Entity {
  id: number;
  title: string;
}
const entities: Entity[] = [{id: 1, title: 'foo1'}, {id: 2, title: 'foo2'}, {id: 3, title: 'foo3'}];

const entities$ = of(entities); // Observable of Entity[]

const ids$ = of([1,2], [1,3]); // Observable of ids.

ids$.pipe(mergeMap(ids => { 
  // Transform collection of id to collection of entity
  return entities$.pipe(map((entities) => {
      // from whole collection of entity, let keep what is present on ids.
      return entities.filter(entity => ids.findIndex(id => entity.id === id) !== -1);
  }));
})).subscribe(console.log);

entities$ Observable должно быть Hot observable (как BehaviorSubject), чтобы всегда получать последние объекты, доступные в вашем магазине.mergeMap параллельное объединение и сопоставление всех новых данных в вашем потоке ids$ с коллекцией сущностей, отфильтрованных по вашей исходной коллекции идентификаторов.

живой пример

...