Объедините последние значения наблюдаемых ПОСЛЕ каждого излучения от наблюдаемого источника - PullRequest
3 голосов
/ 13 июня 2019

У меня есть источник Observable (на самом деле Subject), и когда этот Observable испускает значение, мне нужно запустить загрузку значений, что приводит к обновлению еще 2 наблюдаемых, и когда эта загрузка будет выполнена, возьмите значение источник и объедините его с последними значениями 2 «зависимых» наблюдаемых, но только значения из ПОСЛЕ того, как исходная наблюдаемая излучает. Поэтому, если у 2 зависимых наблюдаемых есть значения, когда исходный источник излучает, мне нужно подождать, пока 2 зависимых не получат обновления, прежде чем испускать самую последнюю из всех 3 наблюдаемых. Я пытался использовать различные заклинания withLatestFrom, switchMap и combLatest, но ничего не дает желаемого результата:

Это выдаст значения в правильной форме, но не в нужное время, он использует значения до того, как SelectExpenseSummary сделал свое дело, и costDetails $ и costReceipts $ обновились.

      this.editExpenseSubject.pipe(
        takeWhile(() => this.active),
         tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
         tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
         withLatestFrom(
          this.expenseDetails$,
          this.expenseReceipts$,
          )
        )
      .subscribe(x => {
        this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
      });

Это генерирует кучу раз, в последний раз правильные значения, что было бы нормально, но испускает неправильную форму, отсутствует источник, наблюдаемый из выходных данных:

      this.editExpenseSubject.pipe(
        takeWhile(() => this.active),
         tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
         tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
         switchMap(() =>
            combineLatest(this.expenseDetails$,
            this.expenseReceipts$,
            )
          )
        )
      .subscribe(x => {
        this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
      });

Это имеет правильную форму вывода, имеющую все 3 наблюдаемые, но никогда не генерирует ничего при подписке, я полагаю, из-за использования editExpenseSubject внутри:

      this.editExpenseSubject.pipe(
        takeWhile(() => this.active),
         tap(x => this.userStore.dispatch(new SelectExpenseSummary(x))),
         tap(x => this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:tap', x})),
         switchMap(x =>
          combineLatest(
            this.editExpenseSubject
            , this.expenseDetails$
            , this.expenseReceipts$
          )
        )
      )
      .subscribe(x => {
        this.log.verbose({type: 'ExpensesFeatureComponent:editExpenseSubject:subscribe', x});
      });

Может ли кто-нибудь указать мне правильное заклинание, чтобы получить то, что я ищу, желательно с подробным объяснением того, почему.


Обновление для дополнительной информации (думаю, это поможет вам пройти весь процесс):

    this.expenseDetails$ = this.expenseStore.pipe(
      select(fromExpenses.getExpenseLines)
    );
    this.expenseReceipts$ = this.expenseStore.pipe(
      select(fromExpenses.getExpenseReceipts)
    );

export const getExpenseLines = createSelector(StateFeatureSelector, x => x.expenseLines);
export const getExpenseReceipts = createSelector(StateFeatureSelector, x => x.expenseReceipts);

export interface State {
  expenseSummaries: Array<IExpenseSummary>;
  expenseLines: Array<IExpenseLine>;
  expenseReceipts: Array<IExpenseReceipt>;
  selectedUser: IDirectReport;
  selectedExpenseSummary: IExpenseSummary;
}


  @Effect() LoadExpenseLines$ = this.actions$.pipe(
    ofType<fromExpense.LoadExpenseLines>(fromExpense.ActionTypes.LoadExpenseLines)
    , tap(action => this.log.debug({type: 'ExpenseEffects:LoadExpenseLines$:filtered', action}))
    , mergeMap(x =>
      this.service.getExpenseLines(x && x.payload)
        .pipe(
          tap(receipts => this.log.debug({type: 'ExpenseEffects:getExpenseLines', receipts}))
          , map(lineItems => new fromExpense.SetExpenseLines(lineItems))
        )
    )
  );
  @Effect() LoadExpenseReceipts$ = this.actions$.pipe(
    ofType<fromExpense.LoadExpenseReceipts>(fromExpense.ActionTypes.LoadExpenseReceipts)
    , tap(action => this.log.debug({type: 'ExpenseEffects:LoadExpenseReceipts$:filtered', action}))
    , mergeMap(x =>
      this.service.getExpenseReceipts(x && x.payload)
        .pipe(
          tap(receipts => this.log.debug({type: 'ExpenseEffects:getExpenseReceipts', receipts}))
          , map(receipts => new fromExpense.SetExpenseReceipts(receipts))
        )
    )
  );

  @Effect() SelectExpenseSummary$ = this.actions$.pipe(
    ofType<fromExpense.SelectExpenseSummary>(fromExpense.ActionTypes.SelectExpenseSummary)
    , tap(action => this.log.debug({type: 'ExpenseEffects:SelectExpenseSummary$:filtered', action}))
    , mergeMap(x =>
        [
          new fromExpense.LoadExpenseLines(x.payload)
          , new fromExpense.LoadExpenseReceipts(x.payload)
        ]
    )
  );

export class SelectExpenseSummary implements Action {
  readonly type = ActionTypes.SelectExpenseSummary;
  constructor(public payload: IExpenseSummary) {}
}

export class LoadExpenseLines implements Action {
  readonly type = ActionTypes.LoadExpenseLines;
  constructor(public payload: IExpenseSummary) {}
}
export class SetExpenseLines implements Action {
  readonly type = ActionTypes.SetExpenseLines;
  constructor(public payload: Array<IExpenseLine>) {}
}

export class LoadExpenseReceipts implements Action {
  readonly type = ActionTypes.LoadExpenseReceipts;
  constructor(public payload: IExpenseSummary) {}
}

export class SetExpenseReceipts implements Action {
  readonly type = ActionTypes.SetExpenseReceipts;
  constructor(public payload: Array<IExpenseReceipt>) {}
}

export function reducer (state = initialState, action: actions.ActionsUnion): State {
  switch (action.type) {
// ...  other actions cut
    case actions.ActionTypes.SetExpenseLines:
      return {
        ...state,
        expenseLines: action.payload && [...action.payload] || []
      };
    case actions.ActionTypes.SetExpenseReceipts:
      return {
        ...state,
        expenseReceipts: action.payload && [...action.payload] || []
      };
    default:
      return state;
  }
}

// from the service class used in the effects

  getExpenseLines(summary: IExpenseSummary): Observable<Array<IExpenseLine>> {
    this.log.debug({type: 'ExpenseService:getExpenseLines', summary, uri: this.detailUri});
    if (summary) {
      return this.http
        .post<Array<IExpenseLine>>(this.detailUri, {ExpenseGroupId: summary.ExpReportNbr})
        .pipe(
          tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseLines:reponse', data }))
        );
    } else {
      this.log.log({ type: 'ExpenseService:getExpenseLines null summary'});
      return of<Array<IExpenseLine>>([])
      .pipe(
        tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseLines:reponse', data }))
      );
    }
  }
  getExpenseReceipts(summary: IExpenseSummary): Observable<Array<IExpenseReceipt>> {
    this.log.debug({type: 'ExpenseService:getExpenseReceipts', summary, uri: this.receiptUri});
    if (summary) {
      return this.http
      .post<Array<IExpenseReceipt>>(this.receiptUri, {ExpenseGroupId: summary.ExpReportNbr})
      .pipe(
        tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseReceipts:reponse', data }))
      );
    } else {
      this.log.log({ type: 'ExpenseService:getExpenseReceipts null summary'});
      return of<Array<IExpenseReceipt>>([])
      .pipe(
        tap(data => this.log.verbose({ type: 'ExpenseService:getExpenseReceipts:reponse', data }))
      );
    }
  }

Ответы [ 2 ]

2 голосов
/ 25 июня 2019

Я исхожу из предположения @ Reactgular:

Вы хотите взять излученное значение у субъекта, а затем излучить из 2 других наблюдаемых их последнее значение.Выходные данные представляют собой массив из 3 элементов.

Вы хотите, чтобы 2 внутренние наблюдаемые извлекали новые данные после того, как внешняя наблюдаемая излучает, и вам нужно только их первое значение.

Оттам, оператор, который приходит на ум, это skip: он пропускает количество значений, которые вы просили пропустить, и продолжается после.

Я сделал песочницу , чтобы вы могли увидеть ее в действии: как вы можете видеть, обе наблюдаемые обновляются из одного источника, и вы получаете только одно событие, которое является последнимзначения каждого наблюдаемого.

связанный код:

import { BehaviorSubject, combineLatest, fromEvent } from "rxjs";
import {
  delayWhen,
  skip,
  distinctUntilKeyChanged,
  distinctUntilChanged,
  skipUntil,
  switchMap,
  map
} from "rxjs/operators";

const counts = {
  source: 0,
  first: 0,
  second: 0
};

const source$ = new BehaviorSubject("source = " + counts.source);
const first$ = new BehaviorSubject("first = " + counts.first);
const second$ = new BehaviorSubject("second = " + counts.second);

// Allow the source to emit
fromEvent(document.querySelector("button"), "click").subscribe(() => {
  counts.source++;
  source$.next("source = " + counts.source);
});

// When the source emits, the others should emit new values
source$.subscribe(source => {
  counts.first++;
  counts.second++;
  first$.next("first = " + counts.first);
  second$.next("second = " + counts.second);
});

// Final result should be an array of all observables

const final$ = source$.pipe(
  switchMap(source =>
    first$.pipe(
      skip(1),
      switchMap(first =>
        second$.pipe(
          skip(1),
          map(second => [source, first, second])
        )
      )
    )
  )
);

final$.subscribe(value => console.log(value));
1 голос
/ 13 июня 2019

Если я вас правильно понимаю ...

Вы хотите взять излученное значение от субъекта, а затем излучить от 2 других наблюдаемых их последнее значение.Выходные данные представляют собой массив из 3 элементов.

Вы хотите, чтобы 2 внутренние наблюдаемые извлекали новые данные после того, как внешняя наблюдаемая излучает, и вам нужно только их первое значение.

 this.editExpenseSubject.pipe(
        switchMap(editExpense =>
            forkJoin(
               this.expenseDetails$.pipe(first()),
               this.expenseReceipts$.pipe(first()),
            ).pipe(
               map(values => ([editExpense, ...values]))
            )
          )
        )

Выможно использовать switchMap () , чтобы он вызывал подписку на внутреннюю наблюдаемую область, и использовать forkJoin () для получения окончательных значений.Добавление оператора first () для ограничения наблюдаемых до 1 значения.Затем вы используете map () , чтобы поместить внешнее значение обратно в массив результатов.

...