Rxjs Тема № следующая асинхронная или нет? - PullRequest
0 голосов
/ 28 июня 2018

Привет, интересно, следующий ли код выполняется последовательно? то есть всегда ли ведется запись в журнал " Рабочий завершил задачу C " после записи в журнал " Выполнено 3 задачи "?

Длинный вопрос: с оператором scan я могу быть уверен, что задачи выполняются последовательно, что меня не беспокоит. Что меня беспокоит, так это то, что я хочу, чтобы последняя подписка делала что-то только после того, как задача С была выполнена, и я не уверен, гарантирует ли это положение, в котором находится o.complete(). например start() запускает do.("A") -> do.("B") -> do.("C"), не дожидаясь завершения сканирования, и сразу запускает o.complete(), выдавая:

Worker has finished task C
Doing task A
Finished 1 task(s)
Doing task B
Finished 2 task(s)
Doing task C
Finished 3 task(s)

Если это так, как вы исправляете код, чтобы добиться того, что я описал?

https://stackblitz.com/edit/typescript-xhhwme

class Worker {
  private tasks: Subject<string>;
  public init(): Observable<number> {
    this.tasks = new Subject<string>();
    return this.tasks.scan((count, task) => {
      console.log("Doing task " + task);
      return ++count;
    }, 0).asObservable();
  }
  public do(task: string): void {
    this.tasks.next(task);
  }
}

function start(worker: Worker): Observable<void> {
  return Observable.create(o => {
    const monitor = worker.init();
    monitor.subscribe(c => console.log("Finished " + c + " task(s)"));
    worker.do("A");
    worker.do("B");
    worker.do("C");
    o.complete();
    worker.do("D");
  });
}

const worker = new Worker();
start(worker).subscribe({
  complete: () => console.log("Worker has finished task C")
});

1 Ответ

0 голосов
/ 28 июня 2018

TLDR: Subject.next синхронно.

Реактивные потоки являются синхронными, если источник является синхронным, если вы явно не сделаете их асинхронными или не смешаете их с асинхронными потоками. Ничего из этого не происходит в вашем коде. Некоторые примеры:

//Synchronous
of(1,2)
  .subscribe(console.log);

//asynchronous because of async source
interval(1000)
  .subscribe(console.log);

//aynchronous because one stream is async (interval)
of(1,2)
  .pipe(
    mergeMap(x => interval(1000).pipe(take(2)))
  )
  .subscribe(console.log);

//async because we make it async
of(1,2, asyncScheduler)
  .subscribe(console.log);

Что происходит в вашем примере? Все внутри Observable.create будет выполнено немедленно. Когда вы вызываете worker.do("A");, this.tasks.next(task); выдает новое значение и выполняется цепочка потоков tasks (синхронно). То же самое происходит с B и C.

При вызове o.complete(); поток start(worker) завершается и печатается "Worker has finished task C". Затем D выполняется потоком tasks.

Более подробную информацию об асинхронном / синхронном поведении можно найти в следующих статьях:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...