У меня следующая проблема:
Многие вызовы API проходят через интерфейс API (Google API) и должны быть ограничены в запросе в секунду / параллелизм из-за ограничения API Google.
Я использую тему ( сток / пул вызовов), которая управляет всеми API-запросами с помощью mergeMap и возвращает результат другому субъекту по каналу.
Поскольку запросы API могут отписаться до того, как они завершатся, они не должны блокировать мой приемник. Поэтому я должен остановить запрос API ( task ) после отписки.
Вопрос:
Я не знаю, как правильно зафиксировать это отписанное состояние. Сейчас я перезаписываю подписки и отмены подписки , чтобы перехватить это состояние. Это работает, но для меня это не выглядит как "rxjs".
Что я могу улучшить?
import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';
function doHeavyRequest() {
return new Observable(subscribe => {
// Simulate delay.
setTimeout(() => {
subscribe.next(1);
subscribe.complete();
}, 1000);
});
}
const sink = new Subject<[Subject<any>, number]>();
sink.pipe(
mergeMap(([subject, id]) => {
// Stop request here if already unsubscribed.
if (subject.closed) {
console.log('Request cancelled:', id);
return EMPTY;
}
return doHeavyRequest()
.pipe(
tap(res => {
if (!subject.closed) {
subject.next(res);
subject.complete();
} else {
console.log('Request aborted:', id);
}
})
);
}, 2)
).subscribe();
// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
const task = new Subject();
const ob = task.asObservable();
ob.subscribe = (...args: any[]) => {
const sub = Observable.prototype.subscribe.call(ob, ...args);
sub.unsubscribe = () => {
if (!task.isStopped)
task.unsubscribe();
Subscription.prototype.unsubscribe.call(sub);
};
return sub;
};
sink.next([task, id]);
return ob;
}
// Make 3 requests and unsubscribe.
export function test() {
const ob0 = getSomething(0);
const ob1 = getSomething(1);
const ob2 = getSomething(2);
const sub0 = ob0.subscribe(e => {
console.log('0:', e);
});
setTimeout(() => sub0.unsubscribe(), 1500);
const sub1 = ob1.subscribe(e => {
console.log('1:', e);
});
setTimeout(() => sub1.unsubscribe(), 900);
const sub2 = ob2.subscribe(e => {
console.log('2:', e);
});
setTimeout(() => sub2.unsubscribe(), 100);
}
См. test.ts на plunker и вывод консоли:
https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview