У меня есть исходный поток, объединенный из двух потоков. Когда исходящий поток генерирует событие, я хотел бы вызвать функцию подписки Meteor.subscribe
и оставить ее открытой, поэтому я использую mergeMap
. Когда подписка готова, я отправляю трубку другому mergeMap
, чтобы заполнить данные. Это работает хорошо, пока я не сделаю 100 щелчков, и потребление памяти стремительно растет. Вопрос в том, как можно ограничить mergeMap не первыми N подписками на concurrent: Number
, а N последними, как скользящее окно?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
Я хотел бы дать более подробное объяснение того, что происходит в этом коде.
В моем примере поток source (merge
перед каналом) никогда не завершается, пока я нажимаю кнопку в своем веб-интерфейсе, поэтому он выдает изменения при нажатии следующей или предыдущей кнопки в моем интерфейсе. Сначала mergeMap
получает изменения из исходного потока и отправляет их в бэкэнд-API (который также имеет конфликтную публикацию / подписку именования). Поэтому, когда данные доступны на клиенте, я звоню observer.next(subscription)
, чтобы перейти ко второму mergeMap
, но я не могу уничтожить или остановить подписку метеора. Две причины: 1. Я хотел бы получить изменения в реальном времени для выбранных данных, 2. Если я остановлю подписку Meteor, данные на стороне клиента будут удалены. Итак, теперь секунда mergeMap
постоянно обновляет выбранные данные, если они были обновлены на сервере.
Так что после каждого нажатия кнопки пользовательского интерфейса (следующий, предыдущий) у меня появляется новая цепочка подписок. Это хорошо, если исходная таблица данных не большая (1000 записей), и я просто нажал пару раз. Но у меня может быть больше 30000, и я могу нажимать на кнопки много раз.
Итак, идея состоит в том, чтобы сделать mergeMap похожим на очередь ограниченного размера, которая содержит только последние N подписок, но очередь меняется все время, когда я нажимаю кнопку.
LAST EDIT: рабочий код:
function paginationCache$(): Observable<any> {
const N = 3;
const subscriptionsSubject = new Subject();
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
subscriptionsSubject.next();
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
}).pipe(
takeUntil(subscriptionsSubject
.pipe(
take(N),
filter((_, idx) => idx === N - 1)
)
)
);
})
);
}