Как ограничить внутренние подписки mergeMap самой последней N или скользящей оконной очередью - PullRequest
2 голосов
/ 21 апреля 2020

У меня есть исходный поток, объединенный из двух потоков. Когда исходящий поток генерирует событие, я хотел бы вызвать функцию подписки Meteor.subscribe и оставить ее открытой, поэтому я использую mergeMap. Когда подписка готова, я отправляю трубку другому mergeMap, чтобы заполнить данные. Это работает хорошо, пока я не сделаю 100 щелчков, и потребление памяти стремительно растет. Вопрос в том, как можно ограничить mergeMap не первыми N подписками на concurrent: Number, а N последними, как скользящее окно?

function paginationCache$(): Observable<any> {

    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes
                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('my/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);

                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                });

            })
        );
}

Я хотел бы дать более подробное объяснение того, что происходит в этом коде.

В моем примере поток source (merge перед каналом) никогда не завершается, пока я нажимаю кнопку в своем веб-интерфейсе, поэтому он выдает изменения при нажатии следующей или предыдущей кнопки в моем интерфейсе. Сначала mergeMap получает изменения из исходного потока и отправляет их в бэкэнд-API (который также имеет конфликтную публикацию / подписку именования). Поэтому, когда данные доступны на клиенте, я звоню observer.next(subscription), чтобы перейти ко второму mergeMap, но я не могу уничтожить или остановить подписку метеора. Две причины: 1. Я хотел бы получить изменения в реальном времени для выбранных данных, 2. Если я остановлю подписку Meteor, данные на стороне клиента будут удалены. Итак, теперь секунда mergeMap постоянно обновляет выбранные данные, если они были обновлены на сервере.

Так что после каждого нажатия кнопки пользовательского интерфейса (следующий, предыдущий) у меня появляется новая цепочка подписок. Это хорошо, если исходная таблица данных не большая (1000 записей), и я просто нажал пару раз. Но у меня может быть больше 30000, и я могу нажимать на кнопки много раз.

Итак, идея состоит в том, чтобы сделать mergeMap похожим на очередь ограниченного размера, которая содержит только последние N подписок, но очередь меняется все время, когда я нажимаю кнопку.

LAST EDIT: рабочий код:

function paginationCache$(): Observable<any> {
    const N = 3;
    const subscriptionsSubject = new Subject();
    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes

                subscriptionsSubject.next();

                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);
                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                }).pipe(
                    takeUntil(subscriptionsSubject
                        .pipe(
                            take(N),
                            filter((_, idx) => idx === N - 1)
                        )
                    )
                );
            })
        );
}

Ответы [ 2 ]

1 голос
/ 21 апреля 2020

Без учета вашего фрагмента, вот как я бы go об этом:

не на первые N подписок по параллельному номеру: Number, а на N недавних, как скользящее окно

Если я правильно понял, вы бы хотели что-то вроде этого (предположим, N = 3):

N = 3

Crt             |  1 |  2 |  3 |
Subscriptions   | S1 | S2 | S3 |


When Crt = 4

Crt           | 2  | 3  |  4 |
Subscriptions | S2 | S3 | S4 |

Если это так, вот как я бы это решил:

const subscriptionsSubject = new Subject();

src$.pipe(
  mergeMap(
    data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
      .pipe(
        takeUntil(
          subscriptionsSubject.pipe(
            take(N), // After `N` subscriptions, it will complete
            filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
          )
        )
      )
  )
)
0 голосов
/ 21 апреля 2020

У меня есть две идеи:

  1. Вы не завершаете второе внутреннее Наблюдаемое. Я думаю, это не должно быть источником вашей проблемы, но лучше заполнить наблюдателей, если вы можете:

    return () => {
      subscription.stop();
      observer.complete();
    };
    
  2. Вы можете использовать bufferCount, чтобы сделать скользящее окно Observables а затем подпишитесь на них с switchMap(). Что-то вроде этого:

    import { of, range } from 'rxjs'; 
    import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators';
    
    range(10)
      .pipe(
        // turn each value to an Observable
        // `refCount` is used so that when `switchMap` switches to a new window
        // it won't trigger resubscribe to its sources and make more requests.
        map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))),
        bufferCount(3, 1),
        tap(console.log), // for debugging purposes only
        switchMap(sourcesArray => merge(...sourcesArray)),
      )
      .subscribe(console.log);
    

    Демонстрация в реальном времени: https://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60

    Я не совсем уверен, что это имитирует ваш вариант использования, но я попытался также включить shareReplay, чтобы не вызывать несколько вызовов Meteor.subscribe для одной и той же наблюдаемой. Мне нужно было бы иметь рабочую демонстрацию вашего кода, чтобы протестировать его сам.

...