Как я могу использовать Rx Js 6 GroupBy для организации потока? Необходимо объединить все выбросы от нескольких сгруппированных наблюдаемых - PullRequest
2 голосов
/ 05 августа 2020

Входной наблюдаемый поток: данные получены из наблюдаемого потока, который является результатом запроса REST для проектов. Данные получены как наблюдаемые .

     const project1: Project = {
        id: 1,
        title: 'zebra',
        rootId: 1,
      }
    
      const project2: Project = {
        id: 2,
        title: 'algebra',
        rootId: 2,
      }
    
      const project3: Project = {
        id: 3,
        title: 'Bobcats',
        rootId: 1,
      }
    
      const project4: Project = {
        id: 4,
        rootId: 2,
      }
    
      const project5: Project = {
        id: 5,
        title: 'Marigolds',
        rootId: 1,
      }
    
      const project6: Project = {
        id: 6,
        title: 'whatever',
        rootId: null,
      }
    
      const project7: Project = {
        id: 7,
        title: 'peppercorns',
        rootId: null,
      }
    
    let groupProjects: Observable<ProjectSummary[]> 
= getGroupProjects(of([project1, project2, project3, project4, project5, project6, project7]]));
    
      getGroupProjects(projects$: Observable<ProjectSummary[]>): Observable<ProjectSummary[]> {
        const timer$ = timer(5000);
        const data = projects$.pipe(takeUntil(timer$), flatMap(projects => projects));
        const groupedObservables = data.pipe(
          groupBy(projects => projects.rootId),
          tap( a => console.log('groupBy:' + a.key))
        );
        const merged = groupedObservables.pipe(
          mergeMap(a => a.pipe(toArray())),
          shareReplay(1),
          tap( a => console.log('final:' + JSON.stringify(a)))
        );
        return merged;
      }

Желаемый результат:

Object{  //Root of 1
  id: 1,
  title: 'zebra',
  rootId: null
}
Object{
  id: 3, //child of 1
  title: 'Bobcats',
  rootId: 1
} 
Object{
  id: 5, //child of 1
  title: 'Marigolds',
  rootId: 1
}
Object{
  id: 2, //root of 2
  title: 'algebra',
  rootId: 2
}
Object{
  id: 4,  //child of 2
  title: 'dogs',
  rootId: 2
}
Object{
  id: 6,  //unaffiliated
  title: 'whatever',
  rootId: null
}
Object{
  id: 7, //unaffiliated
  title: 'peppercorns',
  rootId: null
}

Требование состоит в том, чтобы группы, идентифицированные rootId, появлялись в последовательности перед их дочерними элементами (дочерние элементы появляются после их root), а неаффилированные - перечислены вместе. корни идентифицируются, когда id = rootId, потомки идентифицируются, когда rootId! = null && id! = rootId. Неаффилированные лица идентифицируются нулевым root идентификатором.

В настоящее время отправляется только последняя группа. Как я могу вернуть наблюдаемое, которое излучает все группы и в правильной последовательности? - спасибо

1 Ответ

2 голосов
/ 06 августа 2020

groupBy принимает поток объектов и выдает одну группу, когда поток завершается, он не работает с потоками массивов. Вам нужно сканирование. Сканирование похоже на уменьшение, но оно излучается каждый раз, когда исходный поток излучает, а не один раз в конце.

Я не совсем понимаю, чего вы пытаетесь достичь, задавая свой вопрос, но это должно помочь вам начать

sourceThatEmitsArrays.pipe(
  scan(
   (results, emittedArray) => functionThatAddsEmittedArrayToResults(results, emittedArray),
   [] // Start with an empty array
  )
)

Это то же самое, что и обычная функция сокращения для массивов, но выдает результаты каждый раз, когда источник генерирует.

functionThatAddsEmittedArrayToResults будет выглядеть примерно так:

(results, array) => array.reduce(
  (newResults, current) => {
    const group = findCurrentGroupInNewResultsOrCreateNewGroup(newResults, current);
    replacePreviousGroupInResultsOrAddTheNewOne(newResults, group);
    return newResults;
  },
  results // Start with previous results
)
...