RxJs: рекурсивно разбивать на страницы через API и находить значения из списка - PullRequest
1 голос
/ 21 марта 2019

Я использую rxjs v6.4.0.Я пытаюсь разбить на страницы через API поиск очень специфического канала, где имя равно "разработка".Я использую expand для рекурсивного вызова API и получения новых страниц.Конечный результат дает мне объединенный список каналов.Затем я отфильтровываю все каналы, название которых не равно «развитию».Однако я получаю сообщение об ошибке: TypeError: You provided 'undefined' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

const Rx = require('rxjs')
const Rx2 = require('rxjs/operators')

const getChannel = (cursor) => {
  return this.service.getData(`${url}?cursor=${cursor || ''}`)
      .pipe(Rx2.map(resp => JSON.parse(resp.body)))
      .pipe(Rx2.expand(body => { // recurse the api until no more cursors
      return body.response_metadata && 
        body.response_metadata.next_cursor ? 
        getChannel(body.response_metadata.next_cursor) : Rx.EMPTY
    }))
    .pipe(Rx2.pluck('channels'))
    .pipe(Rx2.mergeAll()) // flattens array
    .pipe(Rx2.filter(c => {
      console.log('finding', c.name === 'development')
      return c.name === 'development'
    }))
}

Ответы [ 2 ]

2 голосов
/ 21 марта 2019

Обратный вызов find должен возвращать логическое значение, а не наблюдаемое.Например,

find(c => c.name === 'development')

ОБНОВЛЕНИЕ

Вот ваш модифицированный пример.Я удалил генераторы, так как они сложнее, чем требуется в нашем случае.

const { of, EMPTY, throwError } = rxjs;
const { filter, tap, expand, pluck, mergeAll } = rxjs.operators;

const DATA = 
  [ {channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1}
  , {channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2}
  , {channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor: false}
  ];

function getChannel(){
  return getBlock()
    .pipe(
        expand(x => x.cursor ? getBlock(x.cursor) : EMPTY),
        pluck('channels'),
        mergeAll(),
        filter(c => c.name === 'devcap')
    )
}

getChannel().subscribe({
  next: console.log,
  error: console.error
});


function getBlock(index = 0) {
  if (index >= DATA.length){
    throwError('Out of bounds');
  }

  return of(DATA[index]);
}

ОБНОВЛЕНИЕ 2

Ваше решение не работает из-за рекурсии, выполняемой черезисключительно getChannel().Когда вы делаете expand - вы запускаете другой цикл через getChannel().Это означает, что вы запускаете pluck-mergeAll-filter chain дважды для каждого рекурсивно извлеченного значения!Дважды потрепав и обольщая, вы получите undefined - следовательно, ошибку.

На вашей игровой площадке - попробуйте выделить этот код

let getValue = ()=>{
    const next = gen.next();
    if (!next || !next.value) { return EMPTY; }
    return next.value;
}

и использовать его в расширении, например, так::

let getChannel = () => {
   return getValue()
      .pipe(
        expand(body => {
          return body.cursor ? getValue() : EMPTY
        }),
        pluck('channels'),
        mergeAll(),
        filter(c => c.name === 'devcap'),
      )
}
0 голосов
/ 22 марта 2019

Дайте мне знать, если это функция, которую вы ищете https://codepen.io/jeremytru91/pen/wOQxbZ?editors=1111

const {
  of,
  EMPTY
} = rxjs;

const {
  filter,
  tap,
  expand,
  take,
  pluck,
  concatAll,
  flatMap,
  first
} = rxjs.operators;

function* apiResponses() {
  yield of({channels: [{id: 123, name: 'test'}, {id:4, name: 'hello'}], cursor: 1});
  yield of({channels:[{id: 3, name: 'react'}, {id: 5, name: 'devcap'}], cursor:3});
  yield of({channels:[{id: 1, name: 'world'}, {id: 2, name: 'knows'}], cursor: 2});
  yield of({channels:[{id: 4, name: 'react'}, {id: 6, name: 'devcap'}], cursor:4});
}

let gen = apiResponses();

function getChannel() {
  return gen.next().value // simulate api call
    .pipe(
      expand(body => {
        return body.cursor ? gen.next().value : EMPTY
      }),
      pluck('channels'),
      flatMap(channels => {
        const filtered = channels.filter(channel => channel.name === 'devcap')
        if(filtered.length) {
          return filtered;
        }
        return EMPTY;
      }),
    first()
    );
}


getChannel().subscribe(data => {
  console.log('DATA!! ', data)
  }, e => {
  console.log('ERROR', e)
  throw e
})
...