forkJoin не ждет завершения всех наблюдаемых - PullRequest
0 голосов
/ 08 марта 2020

import { Component } from "@angular/core";
import { Observable, of, throwError, Subscription, forkJoin } from "rxjs";
import { mergeMap, map, delay, timeout, first, take } from "rxjs/operators";
import { ajax } from "rxjs/ajax";

class Test {
  id: number;
  firstObj: FirstObj;
  secondObj: SecondObj;
  thirdObj: string;
}

interface FirstObj {
  firstObs: boolean;
  result: string;
}

interface SecondObj {
  secondObs1: boolean;
  secondObs2: boolean;
}

@Component({
  selector: "my-app",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent {
  private workOrderSubscription: Subscription;
  name = "Angular";
  obsOne: Observable<any> = of("First Obs")
    .pipe(delay(6000))
    .pipe(
      map(res => {
        return {
          firstObs: true,
          result: res
        };
      })
    );

  dataTwo: SecondObj = { secondObs1: true, secondObs2: false };

  obsTwo: Observable<any> = of(this.dataTwo);
  obsThree: Observable<any> = of("error");

  private getId(): Observable<any> {
    return of("id" as string);
  }

  public retrieveWork(): Observable<Test> {
    const test: Test = new Test();
    this.getId().subscribe((id: number) => {
      test.id = id as number;
      forkJoin(this.obsOne, this.obsTwo, this.obsThree).subscribe(
        ([first, second, third]: [FirstObj, SecondObj, string]) => {
          // some appropriate checks here
          test.firstObj = first as FirstObj;
          test.secondObj = second as SecondObj;
          test.thirdObj = third as string;
          console.log("first is " + first.result);
          console.log("second is " + second.secondObs1);
          console.log("third is " + third);
          console.log(`inside ******************** ${JSON.stringify(test)}`);
          return of(test);
        },
        error => {
          //error here
          console.log("GOT ERROR");
        }
      );
    });

    console.log(`returning ******************** ${JSON.stringify(test)}`);
    return of(test);
  }

  ngOnInit() {
    console.log("data ************************");
    this.retrieveWork()
      .pipe(timeout(10000))
      .subscribe(data => {
        {
          console.log("printing data ***************************");
          console.log(`${JSON.stringify(data)}`);
        }
      });
  }
  ngOnDestroy() {}
}


Я пробовал много способов, в основном то, что я пытаюсь сделать, находится в ngOnInit Я подписываюсь на get (), но данные, которые я получаю, не полны. Подписка не ждет, пока forkJoin полностью не вернется. Я не уверен, как я могу получить полные данные.

После того, как первая подписка завершится, она хотела бы, чтобы первая и вторая подписка завершились, а затем заполнили класс 'Test', а затем мой подписчик. в ngOnInit для получения данных.

Мой вывод выглядит так

data *********************** *

возврат ******************** {"id": "id"}

печать данных ***** **********************

{"id": "id"}

сначала это First Obs

секунда верна

третья ошибка

внутри ******************** {"id": "id" , "firstObj": {"firstObs": true, "result": "First Obs"}, "secondObj": {"secondObs1": true, "secondObs2": false}, "thirdObj": "error"}

Как видно из вышеприведенного вывода, печатается только идентификатор (после 1-й подписки), но после второй подписки подписчик в ngOnInit больше не получает данные.

Спасибо вы.

Ответы [ 2 ]

0 голосов
/ 10 марта 2020

Я заменил вложенную Observable на оператор отображения более высокого порядка (switchMap), и он, похоже, работает. (Но я не совсем понимаю, что это должно выводить.)

Вот что у меня есть:

import { Component } from "@angular/core";
import { Observable, of, forkJoin } from "rxjs";
import { switchMap, map, delay, timeout} from "rxjs/operators";

class Test {
  id: number;
  firstObj: FirstObj;
  secondObj: SecondObj;
  thirdObj: string;
}

interface FirstObj {
  firstObs: boolean;
  result: string;
}

interface SecondObj {
  secondObs1: boolean;
  secondObs2: boolean;
}


@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  title = "Angular";
  obsOne: Observable<any> = of("First Obs")
    .pipe(delay(6000))
    .pipe(
      map(res => {
        return {
          firstObs: true,
          result: res
        };
      })
    );

  dataTwo: SecondObj = { secondObs1: true, secondObs2: false };

  obsTwo: Observable<any> = of(this.dataTwo);
  obsThree: Observable<any> = of("error");


  // Not sure why this was sometimes a string and sometimes a number
  // Changed to a number.
  private getId(): Observable<number> {
    return of(7);
  }

  // Made this declarative so you could do further operations on the
  // stream as needed.
  myTest$ =
    this.getId()
      .pipe(
        // Added switchMap, which is a higher-order mapping operator
        // and automatically subscribes (and unsubscribes) to its inner Observables
        switchMap(id => {
          const test: Test = new Test();
          test.id = id;
          return forkJoin(this.obsOne, this.obsTwo, this.obsThree)
            .pipe(
              map(([first, second, third]: [FirstObj, SecondObj, string]) => {
                // some appropriate checks here
                test.firstObj = first as FirstObj;
                test.secondObj = second as SecondObj;
                test.thirdObj = third as string;
                console.log("first is " + first.result);
                console.log("second is " + second.secondObs1);
                console.log("third is " + third);
                console.log(`inside ******************** ${JSON.stringify(test)}`);
                return test;
              })
            )
        })
      );

  ngOnInit() {
    console.log("data ************************");
    this.myTest$
      .pipe(timeout(10000))
      .subscribe(data => {
        {
          console.log("printing data ***************************");
          console.log(`${JSON.stringify(data)}`);
        }
      });
  }

  ngOnDestroy() { }
}

Делает ли это то, что вы ожидаете?

0 голосов
/ 09 марта 2020

Наконец, похоже, это работает. Я новичок в RX JS. Любые предложения приветствуются.

Также обрабатываются все случаи ошибок.

import { Component } from "@angular/core";
import {
  Observable,
  of,
  throwError,
  Subscription,
  forkJoin,
  combineLatest
} from "rxjs";
import {
  mergeMap,
  map,
  delay,
  timeout,
  first,
  take,
  catchError
} from "rxjs/operators";
import { ajax } from "rxjs/ajax";

class Test {
  id: number;
  firstObj: FirstObj;
  secondObj: SecondObj;
}

interface FirstObj {
  firstObs: boolean;
  result: string;
}

interface SecondObj {
  secondObs1: boolean;
  secondObs2: boolean;
}

@Component({
  selector: "my-app",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent {
  private workOrderSubscription: Subscription;
  name = "Angular";

  dataone: FirstObj = { firstObs: true, result: "hai" };
  obsOne: Observable<any> = of(this.dataone).pipe(delay(1000));
  dataTwo: SecondObj = { secondObs1: true, secondObs2: false };
  obsTwo: Observable<any> = throwError(this.dataTwo);

  private getId(): Observable<any> {
    return of(1).pipe(delay(4000));
  }

  public get(): Observable<any> {
    let test: Test = new Test();
    return new Observable(subscriber =>
      this.getId().pipe(timeout(5000)).subscribe(
        (id: number) => {
          test.id = id;
          forkJoin(
            this.obsOne.pipe(timeout(1000)).pipe(catchError(error => of(null))),
            this.obsTwo.pipe(timeout(1000)).pipe(catchError(error => of(null))),
          ).subscribe(([first, second]) => {
            test.firstObj = first;
            test.secondObj = second;
            console.log("printing data ***************************2");
            console.log(`${JSON.stringify(test)}`);
            subscriber.next(test);
            subscriber.complete();
          });
        },
        err => {
          console.log("error while getting data");
          subscriber.error("error while getting data");
          subscriber.unsubscribe();
        }
      )
    );
  }

  ngOnInit() {
    let subscription: Subscription = this.get()
      .pipe(timeout(6000))
      .subscribe(
        (data: Test) => {
          console.log("in subscribe");
          console.log(`${JSON.stringify(data)}`);
        },
        err => {
          console.log("ERROR " + err);
          subscription.unsubscribe();
        }
      );
  }
  ngOnDestroy() {}
}
...