RxJS: Observable, который принимает в качестве входных данных свое старое значение - PullRequest
5 голосов
/ 22 сентября 2019

Я пытаюсь реализовать что-то вроде этого:

// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
  rxjs.operators.sample(c),
  rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
  rxjs.operators.withLatestFrom(aSampled),
  map(([mValue, oldAValue]) => {
    // Do something.
  }),
  rxjs.operators.startWith(aInitial)
);

Теперь это явно некомпилируемая ерунда, поскольку мы пытаемся sample a перед ее созданием, но, надеюсь, это прояснит мои намерения:Каждое излучаемое значение из a должно зависеть от одного из старых значений, ранее выданных a.Какое старое значение a, это определяется тем, когда в последний раз c что-то излучал.Это похоже на вызов pairwise на a, за исключением того, что я не хочу, чтобы последние два значения были последними, а другое - последующим.

Обратите внимание, что если не для startWith(aInitial) битов,это даже не будет четко определенным вопросом, поскольку первое значение, испускаемое a, будет определяться циклически, ссылаясь на себя.Однако, пока первое значение a указано отдельно, конструкция имеет математический смысл.Я просто не знаю, как правильно реализовать это в коде.Я догадываюсь, что был бы изощренный способ сделать это, написав какой-нибудь пользовательский предмет, но что-то более изящное было бы очень кстати.

Чтобы сделать это немного более конкретным, в моем использованиислучай, когда я имею дело с элементом пользовательского интерфейса типа «щелкни и перетащи в кастрюлю».m - это наблюдаемая из mousemove событий, c - это наблюдаемая из mousedown событий.a будет затем изменяться в зависимости от того, где находится курсор и какое значение было a, когда происходил щелчок.

Ответы [ 3 ]

1 голос
/ 22 сентября 2019

Если я правильно понимаю, основными потоками событий являются mousemove и mousedown.

На основе этих потоков событий вы должны рассчитать новый поток a, который излучает ста же частота mousemove, но чьи данные являются результатом некоторых вычислений, основанных на текущем положении мыши и значении a, которое имело при последнем излучении mousedown.

Итак, если этоправда, мы можем смоделировать mousemove и mousedown с помощью следующих наблюдаемых

// the mouse is clicked every 1 second
const c = interval(1000).pipe(
    tap(cVal => console.log('click', cVal))
);
// the mouse moves diagonally and emits every 200 ms
const m = interval(200).pipe(
    map(mVal => [mVal, mVal]),
);

Нам нужно как-то иметь под рукой значение Наблюдаемой a, когда mousedown излучает.Как мы можем получить это?

Давайте предположим, что у нас есть BehaviourSubject с именем value_of_a, начальное значение которого 1 и которое содержит значение a.Если бы у нас было такое Observable, мы могли бы получить его значение, когда mousedown излучает просто так

const last_relevant_a = c.pipe(       // when mousedown emits
    switchMap(() => value_of_a.pipe(  // the control is switched to the Observable value_of_a
        take(1),                      // and we consider only its first value
    )),
);

С m, Observable при перемещении мыши и last_relevant_a у нас есть все Observable, которые нам нужны,Фактически нам нужно просто объединить их последние выбросы, чтобы получить все элементы, необходимые для вычисления нового значения a.

const a = combineLatest(m, last_relevant_a)
.submit(
   ([mouseVal, old_a_val] => {
      // do something to calculate the new value of a
   }
);

Теперь осталось только убедиться, что value_of_a испускает любое значение, испускаемое a.Это можно сделать, вызвав next на value_of_a в рамках самой подписки a.

Сшивая все вместе, решение может быть примерно таким:

const c = interval(1000).pipe(
    tap(cVal => console.log('click', cVal))
);

const m = interval(200).pipe(
    map(mVal => [mVal, mVal]),
);

const value_of_a = new BehaviorSubject<number>(1);

const last_relevant_a = c.pipe(
    switchMap(cVal => value_of_a.pipe(
        take(1),
    )),
);

const a = combineLatest(m, last_relevant_a);

a.pipe(
    take(20)
)
.subscribe(
    val => {
        // new value of a calculated with an arbitrary logic
        const new_value_of_a = val[0][0] * val[0][1] * val[1];
        // the new value of a is emitted by value_of_a
        value_of_a.next(new_value_of_a);
    }
)

Возможно, этотакже вариант использования для оператора expand, но его следует изучить.

0 голосов
/ 22 сентября 2019

Подумав немного, вот мое собственное решение вместе с минимальным рабочим примером:

// Setup for MWE.
// Note that c just emits once a second, the actual values of it don't matter.
const c = rxjs.interval(1000).pipe(rxjs.operators.take(2));
const m = rxjs.interval(300).pipe(rxjs.operators.take(9));
const aInitial = -1;

// Initialize a.
const a = new rxjs.BehaviorSubject();
a.next(aInitial);

// For testing, log the values a emits.
a.subscribe((val) => {
  console.log(`a: ${val}`);
});

// Sample a at each emission of c.
const aSampled = a.pipe(
  rxjs.operators.sample(c),
  rxjs.operators.startWith(aInitial),
);

// Every time m emits, get also the latest sampled value of a, and do something
// with the two.
m.pipe(
  rxjs.operators.withLatestFrom(aSampled),
  rxjs.operators.map(([mValue, oldAValue]) => {
    // In place of actually computing something useful, just log the arguments
    // and for the sake of demonstration return their product.
    console.log(`m: ${mValue}, old a at last c-emission: ${oldAValue}`);
    return mValue*oldAValue;
  }),
).subscribe(a);  // Route the output back into a.

, который выводит

a: -1
m: 0, old a at last c-emission: -1
a: 0
m: 1, old a at last c-emission: -1
a: -1
m: 2, old a at last c-emission: -1
a: -2
m: 3, old a at last c-emission: -2
a: -6
m: 4, old a at last c-emission: -2
a: -8
m: 5, old a at last c-emission: -2
a: -10
m: 6, old a at last c-emission: -10
a: -60
m: 7, old a at last c-emission: -10
a: -70
m: 8, old a at last c-emission: -10
a: -80

Хитрость в том, что настройка a upпоскольку субъект позволяет нам сначала выдать его начальное значение, а затем подключить его к каналу, который имеет в качестве одного из своих входов более старые, выборочные значения a.Обычные наблюдаемые не позволяют этого (AFAIK), потому что вы должны определить все их входные данные во время создания.Тем не менее, нет необходимости в предметном классе cusom.

0 голосов
/ 22 сентября 2019

Вы можете создать новую тему, названную предыдущей, на основе темы поведения.

Источник темы поведения находится здесь https://github.com/ReactiveX/rxjs/blob/master/src/internal/BehaviorSubject.ts.

Итак, давайте создадим предыдущую тему, которая испускаеттекущее значение вместе с предыдущим значением.

import { Subject } from 'rxjs';
import { Subscriber } from 'rxjs';
import { Subscription } from 'rxjs';
import { SubscriptionLike } from 'rxjs';
import { ObjectUnsubscribedError } from 'rxjs';

export class PreviousSubject<T> extends Subject<T[]> {

  _value: T[];

  constructor(value: T, previous?: T) {
    super();
    this._value = [value, previous];
  }

  get value(): T[] {
    return this.getValue();
  }

  /** @deprecated This is an internal implementation detail, do not use. */
  _subscribe(subscriber: Subscriber<T[]>): Subscription {
    const subscription = super._subscribe(subscriber);
    if (subscription && !(<SubscriptionLike>subscription).closed) {
      subscriber.next(this._value);
    }
    return subscription;
  }

  getValue(): T[] {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else {
      return this._value;
    }
  }

  next(value: T): void {
    this._value = [value, this._value[0]];
    super.next(this._value);
  }
}

См. демонстрацию в https://stackblitz.com/edit/typescript-wiayqx

Или ванильный JS, если вы не хотите TypeScript

const { Subject } = rxjs;

class PreviousSubject extends Subject {

  _value;

  constructor(value, previous) {
    super();
    this._value = [value, previous];
  }

  get value() {
    return this.getValue();
  }

  _subscribe(subscriber) {
    const subscription = super._subscribe(subscriber);
    if (subscription && !(subscription).closed) {
      subscriber.next(this._value);
    }
    return subscription;
  }

  getValue() {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else {
      return this._value;
    }
  }

  next(value) {
    this._value = [value, this._value[0]];
    super.next(this._value);
  }
}

let ps$ = new PreviousSubject('Start value');

ps$.subscribe(([current, previous]) => {
  console.log(`Current: ${current}, Previous: ${previous}`);
});

ps$.next('Value');
ps$.next('Another value');
ps$.next('Another value again');
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
...