Сначала предисловие, для пользы будущих читателей (специально не нацеленных на вас, потому что я часто вижу это):
90% времени вам не нужны Предметы
На самом деле это, вероятно, ближе к 99% времени только потому, что остальные 9% будут зарезервированы для людей, пишущих библиотеки интеграции.Большинство конечных пользователей RxJS не требуют Subjects
в обычном коде.
Теперь, когда это сделано для вашего конкретного вопроса.
Перейдите к основанию для быстрого ответа:).
Ваш метод Get
пытается заново изобрести колесо и вносит ошибку в ваше ожидаемое поведение.
let onGetCompleted = new Subject<any>();
let onGetCompleted$ : Observable<any> = onGetCompleted.asObservable();
this.http.get<T>(url+additionalParam, httpHeaders)
.pipe(
retry(MAX_RETRY)
// TODO add error handler - JP
// catchError()
).subscribe((data) => {
// *** Here is the problem ***
onGetCompleted.next(data);
onGetCompleted.unsubscribe();
})
return onGetCompleted$;
Как @martin проницательно упоминается в комментариях, forkJoin
требует, чтобы каждыйObservable
испустить хотя бы один раз и завершить.
Если вы только что вернули Observable
, который вы создали с помощью this.http.get<T>().pipe()
, непосредственно из этого метода, ваш код будет работать как положено.Однако из-за косвенного обращения через Subject
complete
никогда не вызывается, поэтому forkJoin
никогда не выполняется.
Каждая ваша отладочная подписка работает, потому что каждый поток испускает событие, он просто никогда не завершается.unsubscribe
в контексте RxJS означает отмену Observable
, он не отправляет сигнал завершения или ошибки, с точки зрения нисходящего потока поток просто останавливается, он никогда не завершается, события просто прекращают поступать иабоненту предлагается очистить любые ресурсы, которые он может выделить.В общем, это считается анти-паттерном, чтобы явно отписаться.См .: https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87
Возможно, вам следует использовать такие операторы, как takeUntil
, для управления подписками RxJS.Как правило, если вы видите, что две или более подписок управляются в одном компоненте, вам следует задаться вопросом, могли бы вы составить их лучше.
Обратите также внимание на то, что существует некоторая дополнительная опасность длявызов unsubscribe
явно для Subject
, поскольку он влияет на все текущие и будущие подписки на Subject
, что делает его непригодным для использования.
Исправление
Вместо диктовки логики жизненного цикла в Get
метод и, возможно, выстрел в себя, пусть stream
справится сам.
public Get<T>(url, payload): Observable<T> {
let additionalParam = "";
if(payload) {
additionalParam = "?";
for (var key in payload) {
if(additionalParam != "?") additionalParam += "&"
additionalParam += key+"="+payload[key];
}
}
return this.http.get<T>(url+additionalParam, httpHeaders)
.pipe(retry(MAX_RETRY))
}
Теперь, когда вы используете его, он должен вести себя как положено.Единственное предостережение в том, что в текущей реализации вы, похоже, пытаетесь эмулировать поведение «только один раз».Если вам нужно сохранить это поведение, вы можете сделать это с помощью операторов.Например, если у меня есть два места, которым нужен результат одного и того же вызова, я могу сделать:
const onNext = (loc) => () => console.log(`At ${loc}`);
const source$ = this._http.Get(url, payload).pipe(
tap(onNext('pipe')),
publishLast() // This returns a ConnectableObservable
);
// Sets up both subscriptions
source$.subscribe(onNext('first subscriber'));
source$.subscribe(onNext('second subscriber'));
// Executes the GET call by connecting the underlying subscriber to the source
source$.connect();
// Console
// At pipe
// At first subscriber
// At second subscriber
Так как это похоже на код инициализации, который будет вызываться вашим приложением только один раз при запуске, хотядаже этот шаг может и не понадобиться.