rxjs BehaviorSubject next () вызывается в цикле forEach, что приводит к отмене требования сети - PullRequest
0 голосов
/ 02 ноября 2018

У меня есть действительно забавная проблема с rxjs, которую нужно решить ...

  1. Итак, у меня есть функция, которая получает массив объектов от моего компонента синтаксического анализатора csv uploadCSV()
  2. затем разбивает массив на новые массивы => элементы для обновления line: 10 => элементы для создания line: 11
  3. Затем я выполняю цикл foreach и отправляю каждый объект в BehaviorSubject и увеличиваю счетчики соответственно.

Проблема, с которой я столкнулся, заключается в том, что при первом запуске объект отключается, но затем я вижу в сетевом инструменте, что все последующие запросы отменяются. Я предполагаю, что это потому, что наблюдаемая получает новое значение, прежде чем запрос будет завершен.

Не знаете, как решить эту проблему? dinstinctUntilChanged и возможно отменить?

uploadCSV() {
    const dialogConfig = new MatDialogConfig();
    dialogConfig.autoFocus = true;
    dialogConfig.width = '100vw';

    const dialogRef = this.dialog.open(DataUploadComponent, dialogConfig);

    dialogRef.afterClosed().subscribe(airlines => {
        airlines = airlines.map(airline => ({...airline, id: +airline.id}));
        const update = airlines.filter(airline => this.airlines.some(item => item.id === airline.id));
        const create = airlines.filter(airline => !this.airlines.some(item => item.id === airline.id));
        create.forEach(item => {
            this.newAirlineSubject$.next(item);
            this.created = ++this.created;
        });
        update.forEach(item => {
            this.updateAirlineSubject$.next(item);
            this.updated = ++this.updated;
        });
        this.snackBar.open(
            `${this.updated} items updated : ${this.created} items created`, null,
            {
                duration: 3000
            });
        this.snackBar._openedSnackBarRef.afterOpened().subscribe(() => this.updated = this.created = 0);
    });
}

const newAirline$ = this.newAirlineSubject$
    .pipe(
        filter(airline => !!airline),
        tap(() => this.saveStatus = 1),
        switchMap(airline => this.airlineService.newAirline(airline)),
        tap(() => this.saveStatus = 2),
        map(airline => {
            this.airlines = [airline, ...this.airlines];
            return this.airlines;
        }),
        tap(() => this.saveStatus = 0),
        shareReplay()
    );

const updateAirline$ = this.updateAirlineSubject$
    .pipe(
        filter(airline => !!airline),
        tap(() => this.saveStatus = 1),
        switchMap(airline => this.airlineService.updateAirline(airline)),
        tap(() => this.saveStatus = 2),
        map(airline => {
            const index = this.airlines.findIndex(item => item.id === airline.id);
            this.airlines = [...this.airlines, this.airlines[index] = airline];
            return this.airlines;
        }),
        tap(() => this.saveStatus = 0),
        shareReplay()
    );

const airlines$ = this.airlineService.getAllAirlines()
    .pipe(
        map(airlines => {
            this.airlines = [...airlines];
            return this.airlines;
        }),
        shareReplay()
    );

this.airlines$ = airlines$.pipe(
    merge(newAirline$, updateAirline$)
);

Ответы [ 2 ]

0 голосов
/ 02 ноября 2018

Измените switchMap на concatMap, потому что вы next() в цикле for, он будет излучать синхронно, а switchMap откажется от внутренней наблюдаемой, как только будет исходное излучение, так что в итоге вы получите только последнее излученное значение.

const newAirline$ = this.newAirlineSubject$
    .pipe(
        filter(airline => !!airline),
        tap(() => this.saveStatus = 1),
        concatMap(airline => this.airlineService.newAirline(airline)),
        tap(() => this.saveStatus = 2),
        map(airline => {
            this.airlines = [airline, ...this.airlines];
            return this.airlines;
        }),
        tap(() => this.saveStatus = 0),
        shareReplay()
    );

const updateAirline$ = this.updateAirlineSubject$
    .pipe(
        filter(airline => !!airline),
        tap(() => this.saveStatus = 1),
        concatMap(airline => this.airlineService.updateAirline(airline)),
        tap(() => this.saveStatus = 2),
        map(airline => {
            const index = this.airlines.findIndex(item => item.id === airline.id);
            this.airlines = [...this.airlines, this.airlines[index] = airline];
            return this.airlines;
        }),
        tap(() => this.saveStatus = 0),
        shareReplay()
    );
0 голосов
/ 02 ноября 2018

Трудно получить полную картину, не видя airlineService.newAirline. Я снова с подозрением отношусь к this.saveStatus. Мне нужно было бы увидеть больше кода, чтобы понять, для чего он предназначен, но по мере выпуска каждого нового элемента для него будет возвращаться значение 1 для всех элементов. Я предполагаю, что это не то, что вы хотите, чтобы произошло. Если бы вы могли уточнить эти два, это помогло бы.

Кстати, вот некоторые странные вещи:

    const update = airlines.filter(airline => this.airlines.some(item => item.id === airline.id));
    const create = airlines.filter(airline => !this.airlines.some(item => item.id === airline.id));

Редактировать: Извинения, я пропустил this и интерпретировал airlines и this.airlines как один и тот же массив - это было бы странно ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...