Observable объединяет несколько вызовов функций в один Observable - PullRequest
0 голосов
/ 06 июня 2018

У меня есть функция, которая выполняет http-запрос на основе параметра.И я хочу добавить какую-то функциональность "debounce".Поэтому, если функция вызывается несколько раз в заданном временном окне, я хочу объединить параметры в один запрос, а не делать несколько запросов.

Я хочу добиться этого с помощью Observables и Angular.Это не кажется сложным, однако я не могу запустить его, может быть, я что-то упустил.

А пока давайте просто пропустим объединение в одном запросе, поскольку это можно сделать с агрегированнымdebounce или Oberservable.buffer.У меня проблемы с объединением отдельных Observables.

Вот то, что я пробовал до сих пор.

Я пытался использовать Subject, так как это казалось подходящим объектом для этого случая (https://stackblitz.com/edit/angular-hcn41v?file=src%2Fapp%2Fapp.component.ts).

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

Я ожидаю иметь один журнал 'getUrl Call' для каждого вызова функции и один журнал результатов.Однако я получаю результаты, только если я добавлю более 1 вызовов к this.makeRequest () и результат будеттакже странно. Все предыдущие значения также всегда возвращаются. Мне кажется, я не до конца понимаю, как Subject работает в этом случае.

Другой подход (взят отсюда RXJS: Aggregated debounce )было создать какой-то совокупный дебад (https://stackblitz.com/edit/angular-mx232d?file=src/app/app.component.ts)

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

В этом сценарии у меня проблема, я также получаю все предыдущие значения.

В теории (по крайней мере,для меня) оба варианта звучат правдоподобно, однако кажется, что я что-то упустил. Любой миг в правильном направлении высоко ценится.

Редактировать:

По запросуЯ добавил финалЦель al-world.

Представьте себе сервис, который запрашивает информацию у API.В течение 50-75мс вы звоните в службу с определенным идентификатором.Я хочу сгруппировать эти идентификаторы вместе в один запрос вместо выполнения 3. И если через 100 мс будет сделан другой вызов в службу, будет выполнен новый запрос

Ответы [ 2 ]

0 голосов
/ 25 мая 2019

Rxjs неплохо справляется с этим делом.Вам понадобятся два разных предмета:

  1. Один будет использоваться для сбора и объединения всех запросов
  2. Второй будет использоваться для подписки на результаты

Когда запрос сделан, значение будет передано первому субъекту, но будет возвращен второй, абстрагируясь от внутренней логики объединения запросов.

private values: Subject = new Subject();
private results: Subject = new Subject();

private makeRequest(number: number) {
  this.values.next(number);
  return this.results;
}

Конвейер для объединения запросов может бытьbuffer и debounceTime, как указано в вопросе или другой логике, при необходимости.Когда ответ получен, его просто нужно вставить в результаты. Тема:

constructor(private http: HttpClient) {
  this.values
    .pipe(
      buffer(this.values.pipe(debounceTime(1000))),
      switchMap(values => this.getUrl(values)),
      map(response => this.results.next(response)))
    .subscribe();
}

Я использовал switchMap для имитации асинхронного запроса перед передачей ответа на результаты.

Полный пример здесь: https://angular -8yyvku.stackblitz.io

0 голосов
/ 07 июня 2018
this.makeRequest(1).subscribe();

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(

Вы передаете значение перед подпиской -> Значение теряется.

private values: Subject = new Subject();
private idObservable = this.values.pipe(

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(    

Каждый вызов создает новую наблюдаемую на основе темы.Всякий раз, когда вы отправляете значение, все подписчики получают это значение.

Возможное решение может выглядеть примерно так (я использую новый синтаксис rxjs здесь):

subject: Subject<String> = null;
observable = null;
window = 100;

constructor() {
  this.subject = null;
  this.window = 100;

  this.makeRequest('1').subscribe(console.log)
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  if (!this.subject) {
    this.subject = new ReplaySubject()
    this.observable = this.subject.pipe(
      takeUntil(timer(this.window).pipe(take(1))),
      reduce((url, id, index) => this.combine(url, id), baseUrl),
      flatMap(url => this.request(url)),
      tap(() => this.subject = null),
      share()
    )
  }      

  this.subject.next(id);
  return this.observable;
}  

Где combine создает URL, а request - фактический запрос.

...