Неудобное поведение mergeMap - PullRequest
       10

Неудобное поведение mergeMap

0 голосов
/ 12 сентября 2018

В настоящее время я работаю над способом загрузки файлов, который требует от меня ограничить число одновременных запросов, поступающих.

Я начал с того, что написал прототип того, как он должен обрабатываться

const items = Array.from({ length: 50 }).map((_, n) => n);
from(items)
  .pipe(
    mergeMap(n => {
      return of(n).pipe(delay(2000));
    }, 5)
  )
  .subscribe(n => {
    console.log(n);
  });

И это сработало, однако, как только я поменял of на фактический вызов.Он обрабатывает только один блок, так что, скажем, 5 из 20 файлов

from(files)
  .pipe(mergeMap(handleFile, 5))
  .subscribe(console.log);

Функция handleFile возвращает вызов моей пользовательской реализации ajax

import { Observable, Subscriber } from 'rxjs';
import axios from 'axios';

const { CancelToken } = axios;

class AjaxSubscriber extends Subscriber {
  constructor(destination, settings) {
    super(destination);
    this.send(settings);
  }

  send(settings) {
    const cancelToken = new CancelToken(cancel => {
      // An executor function receives a cancel function as a parameter
      this.cancel = cancel;
    });
    axios(Object.assign({ cancelToken }, settings))
      .then(resp => this.next([null, resp.data]))
      .catch(e => this.next([e, null]));
  }

  next(config) {
    this.done = true;
    const { destination } = this;
    destination.next(config);
  }

  unsubscribe() {
    if (this.cancel) {
      this.cancel();
    }
    super.unsubscribe();
  }
}

export class AjaxObservable extends Observable {
  static create(settings) {
    return new AjaxObservable(settings);
  }

  constructor(settings) {
    super();
    this.settings = settings;
  }

  _subscribe(subscriber) {
    return new AjaxSubscriber(subscriber, this.settings);
  }
}

Так что это выглядит примерно такэто как

function handleFile() {
  return AjaxObservable.create({
    url: "https://jsonplaceholder.typicode.com/todos/1"
  });
}

CodeSandbox

Если я удаляю параметр параллелизма из функции карты слияния, все работает нормально, но загружает все файлы одновременно.Есть ли способ это исправить?

1 Ответ

0 голосов
/ 13 сентября 2018

Оказывается, проблема в том, что я не вызывал complete() метод внутри AjaxSubscriber, поэтому я изменил код так:

pass(response) {
  this.next(response);
  this.complete();
}

А из axios звоните:

axios(Object.assign({ cancelToken }, settings))
  .then(resp => this.pass([null, resp.data]))
  .catch(e => this.pass([e, null]));
...