RxJS: Наблюдаемый поток передается в groupBy (), за которым следует concatMap ();данные для последующих ключей потеряны - PullRequest
0 голосов
/ 28 сентября 2018

Я пытаюсь использовать оператор RxJS groupBy, а затем concatMap для сбора записей в отдельные группы на основе некоторых ключей.

Я заметил, что когда concatMap следует за оператором groupByКажется, данные теряются для всех ключей, которые появляются после первого.

Например:

Рассмотрим следующий блок кода:

// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1)),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
// 
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3

Однакокогда я использую оператор concatMap сам по себе, он ведет себя как ожидалось.

// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();

const result = clicks.pipe(
  concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));

// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3

Считывание документации для RxJS groupBy и concatMap не дает мне никаких подсказок относительно того, что здесь может происходить.Принимая во внимание, что раздел о RxJS concatMap в реагирующий x.io заставляет меня поверить, что это должно работать.

Может кто-нибудь помочь мне понять, что происходит с первым сценарием здесь?Как мне заставить работать первый сценарий?

Ответы [ 2 ]

0 голосов
/ 24 июля 2019

Мой ответ - дополнить Киран и отметить, что вы получите точно такую ​​же проблему, как описано в вопросе, если вы используете асинхронную MergeMap.

когда вы используете groupBy, как объясняет Кирен, он внутренне создает Subject, который немедленно подписывается на источник.Следующие работы ...

source.pipe(
  groupBy(item => item.id),
  mergeMap(byId => {
    return byId.pipe(map(x=>service.put(x)));
  }),

... потому что (из того, что я могу собрать) подписки являются синхронными - mergeMap подписывается на каждую новую группировку немедленно (при условии отсутствия ограничений параллелизма) и поэтому ловит данные,

Если вы хотите сделать что-то асинхронно, для каждой группы вы можете попробовать ...

source.pipe(
  groupBy(item => item.id),
  mergeMap(async byId => {
    let service = await this.getSomething(byId.key); 
    return byId.pipe(map(x=>service.put(x)));
  }),
  mergeAll()

... с этого момента подписка на наблюдаемую группировку будет отложена до mergeAllи он пропустит исходные данные.

Решение точно такое, как говорит Киран: вы должны использовать объект буферизации, чтобы значения могли быть воспроизведены, когда группа, наконец, тоже подписана: groupBy(item => item.id, null, null,()=>new ReplaySubject()) будет работать отлично.

Мое личное решение, родившееся , не желающее буферизации после этой первоначальной подписки , заключалось в создании пользовательского BufferSubject, который буферизуется только до этой первой подписки и затем просто проходит через nextна базу Тема.

/** buffers items until the first subscription, then replays them and stops buffering */
export class BufferSubject<T> extends Subject<T>{
  private _events: T[] = [];

  constructor(private scheduler?: SchedulerLike) {
    super();
  }

  next(value: T) {
    this._events.push(value);    
    super.next(value);
  }

  _subscribe(subscriber: Subscriber<T>): Subscription {
    const _events =  this._events;

    //stop buffering
    this.next = super.next;
    this._events = null;    

    const scheduler = this.scheduler;
    const len = _events.length;
    let subscription: Subscription;

    if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else if (this.isStopped || this.hasError) {
      subscription = Subscription.EMPTY;
    } else {
      this.observers.push(subscriber);
      subscription = new SubjectSubscription(this, subscriber);
    }

    if (scheduler) {
      subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
    }

    for (let i = 0; i < len && !subscriber.closed; i++) {
      subscriber.next(_events[i]);
    }

    if (this.hasError) {
      subscriber.error(this.thrownError);
    } else if (this.isStopped) {
      subscriber.complete();
    }

    return subscription;
  }
}

/** from rxjs internals */
export class SubjectSubscription<T> extends Subscription {
  closed: boolean = false;

  constructor(public subject: Subject<T>, public subscriber: Observer<T>) {
    super();
  }

  unsubscribe() {
    if (this.closed) {
      return;
    }

    this.closed = true;

    const subject = this.subject;
    const observers = subject.observers;

    this.subject = null;

    if (!observers || observers.length === 0 || subject.isStopped || subject.closed) {
      return;
    }

    const subscriberIndex = observers.indexOf(this.subscriber);

    if (subscriberIndex !== -1) {
      observers.splice(subscriberIndex, 1);
    }
  }
}

и используется вместо переигровки:

groupBy(item => item.id, null, null,()=>new BufferSubject())
0 голосов
/ 30 сентября 2018

Я, наконец, выяснил, в чем здесь проблема.

В сценарии № 1 в приведенном выше вопросе код сначала направляет исходный поток в оператор groupBy, а затем concatMap оператор.И эта комбинация операторов, кажется, вызывает эту проблему.

Внутренняя работа groupBy и mergeMap

Считывание кода для оператора groupBy Я понял, что groupBy внутренне создает новый экземпляр Subject для каждого ключа, найденного в исходном потоке.Все значения, принадлежащие этому ключу, немедленно отправляются этим экземпляром Subject.

Все экземпляры Subject заключаются в GroupedObservale с и отправляются в нисходящем направлении оператором groupBy.Этот поток GroupedObservable экземпляров является входом для оператора concatMap.

Оператор concatMap внутренне вызывает оператор mergeMap со значением 1 для concurrency, что означает только один источникobservable подписывается одновременно.

Оператор mergeMap подписывается только на одну наблюдаемую или столько наблюдаемых, сколько разрешено параметром conccurency, и хранит все остальные наблюдаемые в «буфере» до первой.завершено.

Как это создает проблему?

Во-первых, теперь, когда я прочитал код для этих операторов, я не слишком уверен, если это «проблема».

Тем не менее поведение, которое я описал в этом вопросе, происходит потому, что, хотя оператор groupBy генерирует отдельные значения, используя соответствующий экземпляр Subject, оператор mergeMap не подписался бы на этот конкретный Subject.Следовательно, все значения из исходного потока, которые испускаются с использованием этого Subject, теряются.

Я попытался проиллюстрировать эту проблему на грубой мраморной диаграмме: groupBy to concatMap issue

Это не «проблема» с тем, как работают эти операторы, но, возможно, с тем, как я понимал эти операторы и, возможно, с документацией (особенно с документацией для concatMap, которая может немного сбивать с толку тех, кто не знаком с RxJS).

Это можно легко исправить, если заставить оператор groupBy использовать ReplaySubject вместо Subject для выдачи сгруппированных значений.groupBy принимает параметр subjectSelector, который позволяет нам переключать экземпляр Subject с экземпляром ReplaySubject.

Работает следующий код:

// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();

// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }

Почему сценарий2 работа?

Сценарий 2 в моем вопросе работает нормально, потому что interval просто создает Observable, но не начинает излучать значения.Следовательно, все значения из этого Observable доступны, когда mergeMap наконец подписывается на него.

...