Цепная подписка на массив наблюдаемых с использованием rxjs - PullRequest
0 голосов
/ 01 мая 2018

У меня есть массив файлов, которые я хотел бы загрузить (или, по крайней мере, , попробуйте , чтобы загрузить, отдельный сбой в порядке) в Angular 5, по одному, а затем узнать, когда все подписки имеют завершено.

В старые времена Javascript я делал бы это, используя цепные обратные вызовы, но, поскольку сейчас у нас есть замечательные инструменты, такие как rxjs, я чувствую, что есть лучший, более «реактивный» способ сделать это.

Итак: Каков наилучший способ сделать это с rxJS?

У меня есть провайдер Angular, который загружает один файл и возвращает Observable; Я хотел бы попробовать загрузить каждый файл в массиве по отдельности и знать, когда они все завершены.

В моем примере ниже я заменил провайдера простым субъектом, который завершается со случайным успехом или ошибкой через случайное количество времени, пытаясь эмулировать шаткое подключение к Интернету.

Проблема: Когда я использую Observable.combineLatest (), я получаю окончательный результат только тогда, когда ВСЕ наблюдаемые имеют следующий () результат: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]. Если не все Наблюдаемые завершены, я никогда не получу результат вообще.

Кроме того, Наблюдаемые запускаются не по одному, а скорее сразу. При использовании с запросом AJAX это может привести к перегрузке соединения с ячейкой.

Есть мысли о том, как к этому подойти?

constructor() {
    let observables = [];

    for (var i = 0; i < 100; i++) {
        observables.push(this.testObservable(i));
    }


    Observable.combineLatest(observables)
        .subscribe(res => {
            console.log('success', res);
        }, err => {
            console.log('errors', err);
        })
}



testObservable(param) {
    let subject = new Subject;

    let num = Math.random() * 10000;
    console.log('starting', param)
    setTimeout(() => {

        if (Math.random() > 0.5) {
            console.log('success', param);
            subject.next(param);
        } else {
            console.log('error', param);
            subject.error(param);
        }

        subject.complete();
    }, num);

    return subject;
}

Ответы [ 2 ]

0 голосов
/ 09 мая 2018

Наблюдаемые дочерние элементы должны либо выдавать next(), либо они должны выдавать ошибку. В обоих случаях наблюдаемый ребенок должен завершиться. Если наблюдаемый ребенок не завершает, то у вас есть ошибка в наблюдаемом ребенке. Если вы знаете, что у наблюдаемой дочерней функции есть ошибка и она не будет завершена, вы можете использовать тайм-аут, но вам действительно не нужно это делать.

Так как ошибки в дочерних наблюдаемых являются хорошими, вы должны их ловить. И наконец, если вы хотите буферизовать элементы, чтобы они не отправлялись на сервер сразу, вы можете сделать это тоже.

Собираем все вместе ( RxJS Playground Link ):

const start = Date.now();

function elapsed() {
  return Date.now() - start;
}

function dummyObservable(delay, error) {
  // An observable that fails
  if (error) {
    return Rx.Observable.throw(error);
  }
  // An observable that succeeds after some amount of time
  if (delay) {
    return Rx.Observable.create(observer => {
      console.log(elapsed() + ': Request to server emitted');
      setTimeout(() => observer.next(), delay);
    });
  }
  // An observable that never completes (you really shouldn't have these)
  return Rx.Observable.create(() => {});
}

function formatResult(result) {
  if (result.failed) {
    return 'FAIL(' + result.error + ')';
  } else {
    return 'PASS';
  }
}

const obs1 = dummyObservable(1000);
const obs2 = dummyObservable(500);
const fails = dummyObservable(null, new Error('This one fails'));
const neverFinishes = dummyObservable();

const observables = [obs1, obs2, fails, neverFinishes];
// We only want the first response.  Only needed if your source observables aren't completing after emitting one item
const firstOnly = observables.map(obs => obs.first());
// Only allow 5 seconds and abort if no response after 5 seconds
const timeoutsHandled = firstOnly.map(obs => obs.timeout(5000));
// If any failures occur then handle them
const failuresHandled = timeoutsHandled.map(obs => obs.map(() => ({ failed: false })).catch((err) => Rx.Observable.of({ failed: true, error: err })));

const buffered = [];
// Buffer the request so 200 ms pass between each.
for(let i = 0; i < failuresHandled.length; i++) {
  const delay = i * 200;
  buffered.push(Rx.Observable.of([null]).delay(delay).first().switchMap(() => failuresHandled[i]));
}

const combined = Rx.Observable.combineLatest(buffered);
combined.first().subscribe(
  (values) => console.log(elapsed() + ': values: ' + values.map(v => formatResult(v))),
  err => console.log(err)
);
0 голосов
/ 01 мая 2018

Вы можете использовать combineLatest, чтобы дождаться завершения массива наблюдаемых. Он будет излучать, как только каждое наблюдаемое, которое он предоставляет, излучает, по крайней мере, один раз.

let files = [uri1, uri2, uri3];
const observables = files.map(file => this.addMedia({ uri: file, post_id: res.post.Post.id }));

Observable.combineLatest(observables).subscribe(() => console.log('All complete'));

Если вы хотите, чтобы Наблюдаемые выполнялись один за другим, вы можете использовать concat (для заказа) или merge, если порядок не важен.

Чтобы отлавливать ошибки, вы можете добавить оператор catch к каждому наблюдаемому объекту и вернуть пустой наблюдаемый объект или что-то более подходящее.

this.addMedia({ uri: file, post_id: res.post.Post.id })
    .do(val => console.log(`emitting: ${val}`))
    .catch(err => {
      console.log(`error: ${err}`);
      return Observable.empty();
    });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...