Буферизуйте и уменьшайте значения при переключении на другую наблюдаемую - PullRequest
0 голосов
/ 12 декабря 2018

Допустим, у нас есть некоторая input$ наблюдаемая, которая излучает Item:

const input$: Observable<Item>;

Для каждого излучения мне нужно переключиться на другую наблюдаемую (сродни switchMap).Однако мне нужно убедиться, что все эти переключаемые наблюдаемые завершены и работают в последовательности.Достаточно легко, у нас есть concatMap для достижения этой цели:

input$.pipe(concatMap(item => processItem(item)))

Однако, что я хотел бы сделать вместо этого: буферизировать элементы и уменьшать их (т.е. у меня есть функция (a: Item, b: Item): Item), пока один изэти переключаемые наблюдаемые активны.В частности, предположим, type Item = {[key: string]: string}.В этом случае мой редуктор будет просто {...a, ...b}.

У нас есть много операторов buffer*, window* и throttle*, хотя я не могу найти простой комбинация для достижения этого поведения.

Я мог бы очень хорошо написать свой пользовательский оператор, но меня интересует , возможно ли вместо этого выразить это как (simple-ish) комбинацию некоторыхвстроенные операторы ?

Просто чтобы прояснить: наблюдаемый вывод должен выдавать значение наблюдаемого, на которое мы переключились, а не буферизованные / уменьшенные значения.Кроме того, хотя завершение / ошибка источника должны отражаться в выходных данных, любая текущая внутренняя подписка должна заканчиваться первой.

Оператор, которого я ищу, должен иметь подпись, аналогичную

bufferedConcatMap<T, R>(
    project: (value: T) => Observable<R>, 
    reducer: (values: T[]) => T
): OperatorFunction<T, R>;

Для полноты картины приведу мраморную диаграмму оператора, которого я ищу.Это предполагает добавление в качестве редуктора, и мы просто переключаемся на вход, но с задержкой в ​​четыре такта:

Input:  ---123--|
Output: ------1--(5|)

Здесь 1 немедленно переключается на нашу задержку (так как не происходитвнутренняя подписка), и через четыре галочки мы получим результат.Поскольку, в то же время, 2 и 3 были выпущены, они буферизуются вместе и уменьшаются до 2 + 3 = 5, что снова испускается через четыре такта, так как мы переключились на это только после возвращения 1.

1 Ответ

0 голосов
/ 12 декабря 2018

Обновление 12/13 : поскольку я придерживаюсь предположения о том, что простая комбинация встроенных операторов здесь не справится, я реализовал свой собственный оператор.Вопреки моему первоначальному требованию, это ведет себя следующим образом:

  • Если источник завершает работу, выходные данные сначала ждут завершения активной внутренней подписки, прежде чем завершить внешнюю наблюдаемую.
  • Еслиисходные ошибки, внешняя наблюдаемая немедленно распространит ошибку.Это в большей степени согласуется с такими операторами, как concatMap и exhaustMap.

Мне еще предстоит написать набор тестов для этого, но пока, похоже, он работает нормально.

Я выложу здесь код оператора, вы также можете найти здесь игровую площадку Stackblitz .

type Reducer<A, B> = (values: A[]) => B;
type Project<A, B> = (value: A) => ObservableInput<B>;

export function bufferReduceMap<A, B, R>(reducer: Reducer<A, B>, project: Project<B, R>): OperatorFunction<A, R> {
  return function (source: Observable<A>) {
    return source.lift(new BufferReduceMapOperator<A, B, R>(reducer, project));
  };
}

class BufferReduceMapOperator<A, B, R> implements Operator<A, R> {

  constructor(private reducer: Reducer<A, B>, private project: Project<B, R>) {}

  call(subscriber: Subscriber<R>, source: any): TeardownLogic {
    return source.subscribe(new BufferReduceMapSubscriber<A, B, R>(subscriber, this.reducer, this.project));
  }

}

class BufferReduceMapSubscriber<A, B, R> extends OuterSubscriber<A, B> {

  private buffer: A[] = [];
  private active = false;
  private hasCompleted = false;
  private hasErrored = false;

  constructor(
    destination: Subscriber<R>,
    private reducer: Reducer<A, B>,
    private project: Project<B, R>,
  ) {
    super(destination);
  }

  protected _next(value: A) {
    const buffer = this.buffer;
    buffer.push(value);

    this._tryNext();
  }

  protected _complete() {
    this.hasCompleted = true;
    if (!this.active && this.buffer.length === 0) {
      this.destination.complete();
    }

    this.unsubscribe();
  }

  public notifyComplete(innerSub: Subscription) {
    this.remove(innerSub);

    this.active = false;
    if (this.buffer.length !== 0) {
      this._tryNext();
    } else if (this.hasCompleted) {
      this.destination.complete();
    }
  }

  protected _tryNext() {
    if (this.active) {
      return;
    }

    let reduced: B;
    try {
      reduced = this.reducer(this.buffer);
    } catch (err) {
      this.destination.error(err);
      return;
    }

    let result: ObservableInput<R>;
    try {
      result = this.project(reduced);
    } catch (err) {
      this.destination.error(err);
      return;
    }

    this.active = true;

    const innerSubscriber = new InnerSubscriber(this, undefined, undefined);

    const destination = this.destination as Subscription;
    destination.add(innerSubscriber);

    this.buffer = [];
    subscribeTo<R>(result)(innerSubscriber);
  }

}
...