Излучение при отмене / заполнении внутренней наблюдаемой - PullRequest
1 голос
/ 25 апреля 2020

Мне нужно понять, была ли внутренняя наблюдаемая не завершена успешно (все отписались) и в этом случае выдать значение. Вид defaultIfEmpty, но он не работает.

  1. Есть триггер (интервал).
  2. Есть сложная логика c, которая может потреблять много времени (таймер).
  3. Когда триггер запускается снова и вызывает отмену сложной логики c (switchMap), я хочу получать уведомления об этом в потоке (для выдачи значения).

Вот пример stackblitz.com https://stackblitz.com/edit/angular-pf8z5z?file=src%2Fapp%2Fapp.component.ts

код

import {Component, OnDestroy, OnInit} from '@angular/core';
import * as rxjs from 'rxjs';
import * as operators from 'rxjs/operators';

@Component({
    selector: 'my-app',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit, OnDestroy {
    name = 'Angular';

    protected destroy$ = new rxjs.Subject();

    ngOnInit(): void {
        rxjs.interval(1000).pipe(
            operators.map(() => `${Math.random()}`),
            operators.switchMap(taskId => rxjs.timer(Math.random() * 1000 + 500).pipe(
                operators.mapTo([taskId, 'executed']),

                // something complicated that can be canceled
                // TASK: somehow in case of the cancellation I want to notify the stream

                // start indicator, works fine
                operators.startWith([taskId, 'start']),

                // end indicator, TODO, make it work if the pipe was cancelled
                // when the pipe has been completed - it works.
                operators.endWith([taskId, 'end']),
                operators.catchError(() => rxjs.of([taskId, 'end'])),

                // this works correctly, but it doesn't emit a value.
                operators.finalize(() => console.log(taskId, 'end')),
            )),

            // below is the code that doesn't belong to the task.
            // it displays the issue in the console.

            // checking active tasks
            operators.scan((activeTasks, [taskId, action]) => {
                if (action === 'start') {
                    activeTasks.push(taskId);
                }
                if (action === 'end') {
                    activeTasks.splice(activeTasks.indexOf(taskId), 1);
                }
                if (action === 'cancelled') {
                    activeTasks.splice(activeTasks.indexOf(taskId), 1);
                }
                return [...activeTasks];
            }, []),

            // It should always display just one active task
            // because all others were completed / canceled.
            operators.tap(console.log),
            operators.takeUntil(this.destroy$),
        ).subscribe();
    }

    ngOnDestroy(): void {
        this.destroy$.next();
        this.destroy$.complete();
    }
}

ОБНОВЛЕНО

Возможное решение с функциями старой школы.

operators.switchMap(function(taskId, idx) {
    const self = this;
    return rxjs.timer(Math.random() * 1000 + 500).pipe(
        operators.mapTo([taskId, 'executed']),
        operators.startWith([taskId, 'start']),

        // SOLUTION: this works correctly, but has ugly access to switchMap source.
        operators.finalize(() => {
            self.destination.next([taskId, 'end']);
        }),
    );
}),

Ответы [ 3 ]

1 голос
/ 25 апреля 2020

Я думаю, что это должно работать:

rxjs.interval(1000).pipe(
  operators.tap(() => console.log('start')),
  operators.switchMap((_, idx) => rxjs.timer(1500).pipe(
    idx > 0 ? operators.startWith('completed') : rxjs.identity,
  )),
  operators.tap(() => console.log('end')),
  operators.takeUntil(this.destroy$),
).subscribe(console.log, console.log);

В предоставленной функции switchMap у вас есть доступ к index.

Если idx === 0, это означает, что нет активной внутренней наблюдаемой, поэтому от нее не следует отказываться.

В противном случае, если текущая внутренняя наблюдаемая не завершена тем временем, если приходит новое внешнее значение, это означает, что активные внутренние объекты будут отменены, это то, что указывает startWith.

Почему endWith не работает

operators.switchMap(() => rxjs.timer(1500).pipe(
  operators.finalize(() => console.log('this works')),
  operators.endWith('cancelled'),
)),

Прежде всего, важно отметить, что

src$.pipe(endWith(value))

- это то же самое, что и

concat(src$, of(value))

, что по существу совпадает с:

of(src$, of(value))
 .pipe(
  mergeMap(obs$ => obs$, 1) // concurrency set to 1
)

Как вы знаете, mergeMap обрабатывает внутренние наблюдаемые. Внутренняя наблюдаемая требует внутреннего подписчика . В этом случае каждый внутренний подписчик будет добавлен в список _subscriptions получателя. В этом случае целевой абонент является внутренним абонентом switchMap. Иными словами, все внутренние подписчики mergeMap могут рассматриваться как дочерние элементы внутреннего подписчика switchMap.

Когда новое внешнее значение перехватывается switchMap, оно отписывается от текущего внутреннего абонент. Когда это происходит, он отписывается от всех своих подписчиков-потомков, что будет делать то же самое для своих потомков и так далее. Таким образом, endWith не будет работать, потому что его потребитель (внутренний подписчик switchMap) отписывается.

EDIT - случай, когда внутренняя наблюдаемая завершает

  ngOnInit(): void {
    rxjs.interval(1000).pipe(
      operators.tap(() => console.log('start')),
      operators.switchMap(function (_, idx) { 
        return rxjs.timer(Math.random() * 1500).pipe(
        operators.tap(null, null, () => console.warn("complete!")),
        idx > 0 && this.innerSubscription ? operators.startWith(true) : rxjs.identity,
      )}),
      operators.tap(() => console.log('end')),
      operators.takeUntil(this.destroy$),
    ).subscribe(console.log, console.log);
  }

SwitchMapSubscriber.innerSubscription равен null, когда активных подписчиков нет (например, внутренняя наблюдаемая завершена).

Редактировать 2 - с использованием субъекта

Эта альтернатива предполагает использование Subject, который будет излучать каждый раз Внутренние объекты, поддерживаемые switchMap, отписываются.

const interruptSubject = new Subject();

src$ = src$.pipe(
  switchMap(
    (/* ... */) => timer(/* ... */)
      .pipe(
        /* ... */
        finalize(() => interruptSubject.next(/* ...your action here... */))
      )
  )
);

merge(src$, interruptSubject)
  .subscribe(/* ... */)
1 голос
/ 28 апреля 2020

Если я правильно понял проблему, вы хотите отправить сообщение, когда сработает триггер, и это сообщение должно отражать тот факт, что предыдущее выполнение было отменено и начинается новое.

Если это в этом случае, вам нужно изучить функцию, переданную в switchMap. В частности, такая функция должна возвращать Observable, который немедленно выдает сообщение об отмене и выдает результат длительной сложной обработки, как только обработка завершается. Такой Observable будет выглядеть так:

merge(of([prevTaskId, "cancelled"]), doComplexStuff(taskId))

, где doComplexStuff - это функция, охватывающая ваш код, подобный этому

function doComplexStuff(taskId: string) {
  prevTaskId = taskId;
  return timer(1500).pipe(
    mapTo([taskId, "executed"]),

    // something complicated that can be canceled
    // TASK: somehow in case of the cancellation I want to notify the stream

    // start indicator, works fine
    startWith([taskId, "start"]),

    // end indicator, TODO, make it work if the pipe was cancelled
    // when the pipe has been completed - it works.
    endWith([taskId, "end"]),
    catchError(() => of([taskId, "end"]))

    // this works correctly, but it doesn't emit a value.
    //operators.finalize(() => console.log(taskId, 'end')),
  );
}

Таким образом, решение может выглядеть следующим образом:

let prevTaskId;
interval(1000)
  .pipe(
    take(10),
    map(() => `${Math.random()}`),
    switchMap((taskId) =>
      merge(of([prevTaskId, "cancelled"]), doComplexStuff(taskId))
    ),

    // below is the code that doesn't belong to the task.
    // it displays the issue in the console.

    // checking active tasks
    scan((activeTasks, [taskId, action]) => {
      if (action === "start") {
        console.log("start", taskId);
        activeTasks.push(taskId);
      }
      if (action === "end") {
        console.log("end", taskId);
        activeTasks.splice(activeTasks.indexOf(taskId), 1);
      }
      if (action === "cancelled") {
        console.log("cancelled", taskId);
        activeTasks.splice(activeTasks.indexOf(taskId), 1);
      }
      return [...activeTasks];
    }, []),

    // It should always display just one active task
    // because all others were completed / canceled.
    tap(console.log)
  )
  .subscribe();

Здесь стекаблиц для этого кода

1 голос
/ 25 апреля 2020

Похоже, вам нужен оператор catchError:

ngOnInit(): void {
    rxjs.interval(1000).pipe(
      operators.tap(() => console.log('start')),
      operators.switchMap(() => rxjs.timer(1500).pipe(
        operators.finalize(() => console.log('this works')),

        // in case the timer observable above fails, it'll emit 'cancelled'
        operators.catchError((e) => rxjs.of('cancelled')),
      )),
      operators.takeUntil(this.destroy$),
    ).subscribe(console.log);
  }
...