forkJoin не вызывает ответ - PullRequest
       4

forkJoin не вызывает ответ

0 голосов
/ 07 февраля 2019

У меня есть следующий код:

    let extension = this._http.Get('/settings/get_kv/credentials', {credential_key : "extensions"})
    let urlRef = this._http.Get('/settings/get_kv/credentials', {credential_key : "urlRef"})
    let product = this._http.Get('/settings/get_kv/credentials', {credential_key : "product"})
    let messageBackend = this._http.Get('/settings/get_kv/credentials', {credential_key : "messageBackend"})

    Observable.forkJoin([extension, urlRef, product, messageBackend]).subscribe((results:any) => {
        let apikvConfig = {...results[0], ...results[1], ...results[2], ...results[3]};
    });

мой getMethos выглядит так:

public Get<T>(url, payload): Observable<T> {
    let additionalParam = "";
    if(payload) {
        additionalParam = "?";
        for (var key in payload) {
            if(additionalParam != "?") additionalParam += "&"
            additionalParam += key+"="+payload[key];
        }
    }

    let onGetCompleted = new Subject<any>();
    let onGetCompleted$ : Observable<any> = onGetCompleted.asObservable();

    this.http.get<T>(url+additionalParam, httpHeaders)
        .pipe(
            retry(MAX_RETRY)

            // TODO add error handler - JP
            // catchError()
        ).subscribe((data) => {
            onGetCompleted.next(data);
            onGetCompleted.unsubscribe();
        })
    return onGetCompleted$;
}

Но forkJoin никогда не входит в подписку.

Я пыталсяслиться, а также конкатить, но они возвращают мой наблюдаемый, а не результат.

Что я делаю не так?

РЕДАКТИРОВАТЬ:

Я сделал:

Load(): Observable<any>{
    let extension = this._http.Get('/settings/get_kv/credentials', {credential_key : "extensions"})
    let urlRef = this._http.Get('/settings/get_kv/credentials', {credential_key : "urlRef"})
    let product = this._http.Get('/settings/get_kv/credentials', {credential_key : "product"})
    let messageBackend = this._http.Get('/settings/get_kv/credentials', {credential_key : "messageBackend"})

    extension.subscribe((res) => { console.log(res)});
    urlRef.subscribe((res) => { console.log(res)});
    product.subscribe((res) => { console.log(res)});
    messageBackend.subscribe((res) => { console.log(res)});


    let obs = Observable.forkJoin([extension, urlRef, product, messageBackend])

    obs.subscribe((results:any) => {
        let apikvConfig = {...results[0], ...results[1], ...results[2], ...results[3]};
        console.log(apikvConfig);
        return;
        this._storeApp.dispatch(new StoreUseraction.SetExtensionsValue(apikvConfig));
    });

    return obs;
}

каждый журнал возвращает значение

1 Ответ

0 голосов
/ 07 февраля 2019

Сначала предисловие, для пользы будущих читателей (специально не нацеленных на вас, потому что я часто вижу это):

90% времени вам не нужны Предметы

На самом деле это, вероятно, ближе к 99% времени только потому, что остальные 9% будут зарезервированы для людей, пишущих библиотеки интеграции.Большинство конечных пользователей RxJS не требуют Subjects в обычном коде.

Теперь, когда это сделано для вашего конкретного вопроса.

Перейдите к основанию для быстрого ответа:).

Ваш метод Get пытается заново изобрести колесо и вносит ошибку в ваше ожидаемое поведение.

let onGetCompleted = new Subject<any>();
let onGetCompleted$ : Observable<any> = onGetCompleted.asObservable();

this.http.get<T>(url+additionalParam, httpHeaders)
    .pipe(
        retry(MAX_RETRY)

        // TODO add error handler - JP
        // catchError()
    ).subscribe((data) => {
        // *** Here is the problem ***
        onGetCompleted.next(data);
        onGetCompleted.unsubscribe();
    })
return onGetCompleted$;

Как @martin проницательно упоминается в комментариях, forkJoin требует, чтобы каждыйObservable испустить хотя бы один раз и завершить.

Если вы только что вернули Observable, который вы создали с помощью this.http.get<T>().pipe(), непосредственно из этого метода, ваш код будет работать как положено.Однако из-за косвенного обращения через Subject complete никогда не вызывается, поэтому forkJoin никогда не выполняется.

Каждая ваша отладочная подписка работает, потому что каждый поток испускает событие, он просто никогда не завершается.unsubscribe в контексте RxJS означает отмену Observable, он не отправляет сигнал завершения или ошибки, с точки зрения нисходящего потока поток просто останавливается, он никогда не завершается, события просто прекращают поступать иабоненту предлагается очистить любые ресурсы, которые он может выделить.В общем, это считается анти-паттерном, чтобы явно отписаться.См .: https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87

Возможно, вам следует использовать такие операторы, как takeUntil, для управления подписками RxJS.Как правило, если вы видите, что две или более подписок управляются в одном компоненте, вам следует задаться вопросом, могли бы вы составить их лучше.

Обратите также внимание на то, что существует некоторая дополнительная опасность длявызов unsubscribe явно для Subject, поскольку он влияет на все текущие и будущие подписки на Subject, что делает его непригодным для использования.

Исправление

Вместо диктовки логики жизненного цикла в Get метод и, возможно, выстрел в себя, пусть stream справится сам.

public Get<T>(url, payload): Observable<T> {
    let additionalParam = "";
    if(payload) {
        additionalParam = "?";
        for (var key in payload) {
            if(additionalParam != "?") additionalParam += "&"
            additionalParam += key+"="+payload[key];
        }
    }


    return this.http.get<T>(url+additionalParam, httpHeaders)
        .pipe(retry(MAX_RETRY))
}

Теперь, когда вы используете его, он должен вести себя как положено.Единственное предостережение в том, что в текущей реализации вы, похоже, пытаетесь эмулировать поведение «только один раз».Если вам нужно сохранить это поведение, вы можете сделать это с помощью операторов.Например, если у меня есть два места, которым нужен результат одного и того же вызова, я могу сделать:

const onNext = (loc) => () => console.log(`At ${loc}`);

const source$ = this._http.Get(url, payload).pipe(
  tap(onNext('pipe')),
  publishLast() // This returns a ConnectableObservable
);


// Sets up both subscriptions
source$.subscribe(onNext('first subscriber'));
source$.subscribe(onNext('second subscriber'));
// Executes the GET call by connecting the underlying subscriber to the source
source$.connect();

// Console
// At pipe
// At first subscriber
// At second subscriber

Так как это похоже на код инициализации, который будет вызываться вашим приложением только один раз при запуске, хотядаже этот шаг может и не понадобиться.

...