Рекурсия и наблюдаемые RxJs - PullRequest
0 голосов
/ 28 февраля 2020

Я выполняю пагинацию внутри и наблюдаемом потоке. Разбивка на страницы осуществляется с помощью курсора и общего подсчета с использованием рекурсии.

Я могу создать каждую страницу, используя следующий код observer.next(searches);, кстати, я хотел бы использовать только наблюдаемый и без обещаний, но Я не могу express рекурсия с использованием операторов Rx Js.

Есть предложения?

    const search = id =>
      new Observable(observer => { recursePages(id, observer) })

    const recursePages = (id, observer, processed, searchAfter) => {
      httpService.post(
        "http://service.com/search",
        {
          size: 50,
          ...searchAfter ? { search_after: searchAfter } : null,
          id,
        })
        .toPromise() // httpService.post returns an Observable<AxiosResponse>
        .then(res => {
          const body = res.data;
          const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
          observer.next(searches);
          const totalProcessed = processed + searches.length;
          if (totalProcessed < body.data.total) {
            return recursePages(id, observer, totalProcessed, searches[searches.length - 1].cursor);
          }
          observer.complete();
        })
    }

    // General Observer
    incomingMessages.pipe(
        flatMap(msg => search(JSON.parse(msg.content.toString()))),
        concatAll(),
    ).subscribe(console.log),    


1 Ответ

0 голосов
/ 28 февраля 2020

эти методы будут рекурсивно собирать все страницы и отправлять их в массив. затем страницы могут быть переданы из, как показано:

// break this out to clean up functions
const performSearch = (id, searchAfter?) => {
  return httpService.post(
    "http://service.com/search",
    {
      size: 50,
      ...searchAfter ? { search_after: searchAfter } : null,
      id,
    });
}

// main recursion
const _search = (id, processed, searchAfter?) => {
  return performSearch(id, searchAfter).pipe( // get page
    switchMap(res => {
      const body = res.data;
      const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
      const totalProcessed = processed + searches.length;
      if (totalProcessed < body.total) {
        // if not done, recurse and get next page
        return _search(id, totalProcessed, searches[searches.length - 1].cursor).pipe(
          // attach recursed pages
          map(nextPages => [searches].concat(nextPages)
        );
      }
      // if we're done just return the page
      return of([searches]);
    })
  )
}

// entry point
// switch into from to emit pages one by one
const search = id => _search(id, 0).pipe(switchMap(pages => from(pages))

, если вам действительно нужно, чтобы все страницы излучали одну за другой, прежде чем все они будут извлечены, например, чтобы вы могли показать страницу 1 как только он будет доступен, а не ждать на странице 2+, это можно сделать с помощью некоторых настроек. дайте мне знать.

РЕДАКТИРОВАТЬ: этот метод будет излучать один за другим

const _search = (id, processed, searchAfter?) => {
  return performSearch(id, searchAfter).pipe( // get page
    switchMap(res => {
      const body = res.data;
      const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
      const totalProcessed = processed + searches.length;
      if (totalProcessed < body.total) {
        // if not done, concat current page with recursive call for next page
        return concat(
          of(searches),
          _search(id, totalProcessed, searches[searches.length - 1].cursor)
        );
      }
      // if we're done just return the page
      return of(searches);
    })
  )
}
const search = id => _search(id, 0)

в результате вы получите наблюдаемую структуру, такую ​​как:

concat(
  post$(page1),
  concat(
    post$(page2),
    concat(
      post$(page3),
      post$(page4)
    )
  )
)

и так как вложенный concat() операций сводятся к уплощенной структуре, эта структура будет уменьшаться до:

concat(post$(page1), post$(page2), post$(page3), post$(page4))

, что вам нужно, и запросы выполняются последовательно.

также кажется, что расширение может сделать хитрость в соответствии с комментарием @NickL, примерно так:

search = (id) => {
  let totalProcessed = 0;
  return performSearch(id).pipe(
    expand(res => {
      const body = res.data;
      const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
      totalProcessed += searches.length;
      if (totalProcessed < body.data.total) {
        // not done, keep expanding
        return performSearch(id, searches[searches.length - 1].cursor);
      }
      return EMPTY; // break with EMPTY
    })
  )
}

, хотя я никогда раньше не использовал расширение, и это основано на каком-то очень ограниченном тестировании, но я почти уверен, что это работает.

оба эти метода могут использовать оператор уменьшения (или сканирования) для сбора результатов, если вы когда-либо хотели:

search(id).pipe(reduce((all, page) => all.concat(page), []))
...