У меня есть механизм опроса в RxJS6, который опрашивает API для новых элементов ( измерения ).Это работает как очарование, кроме: оно содержит состояние.Поэтому работает только первый вызов getMeasurements .Когда я вызываю метод во второй раз, используя другой TypeId , он начинает колебаться и запускать HTTP-запросы, пока мой браузер не зависнет.Вероятно, потому что внутренние подписки никогда не отписываются.Но как я могу объединить их без внутренних подписок?
API использует своего рода разбиение на страницы в ответе - поскольку список огромен - помимо списка он возвращает следующую ссылку, которую веб-интерфейс может использовать для получения следующего наборапредметов.Для клиента логика выглядит следующим образом:
- Если ответ содержит измерения, объедините их с текущими и сделайте новый запрос, используя nextlink , только что полученный и повторите.
- Если новые измерения не найдены, подождите 10 секунд для следующей попытки.
Это метод MeasurementService и getMeasurements , который яиспользовать для загрузки и опроса:
@Injectable()
export class MeasurementService {
loadTrigger$ = new Subject();
measurements$ = new Subject<Measurement[]>();
currentMeasurements: Measurement[] = [];
beginDate: Date;
nextlinkFormat: string = 'http://myapi?type=TYPEID';
nextlink: string;
constructor(private http: HttpClient) { }
public getMeasurements(typeId: TypeId): Observable<Measurement[]> {
console.info('MeasurementService.getMeasurements()', pollInterval);
this.nextlink = this.nextlinkFormat
.replace('TYPEID', typeId.toString())
this.currentMeasurements = [];
timer(0, pollInterval)
.subscribe(x => this.loadTrigger$.next());
this.loadTrigger$.pipe(
map(x => this.http.get(this.nextlink)),
)
.subscribe((response) => {
response.pipe(map((response) => response as MeasurementsDto)).
subscribe(
measurementsDto => {
let newMeasurements = measurementsDto.measurements.map(Measurement.fromMeasurementDto);
this.currentMeasurements.push.apply(this.currentMeasurements, newMeasurements);
this.nextlink = measurementsDto.nextlink;
if (newMeasurements.length > 0) {
console.log(newMeasurements.length + " new items");
this.loadTrigger$.next();
this.measurements$.next(this.currentMeasurements);
}
});
});
return this.measurements$.asObservable();
}
}