RXJS, как отложить каждую наблюдаемую (HTTP-запрос) и объединить все запросы вывода - PullRequest
1 голос
/ 27 сентября 2019

У меня проблема с установкой интервала между http-запросами в пределах одной наблюдаемой.Дело в том, что я хочу иметь задержку между каждым http-запросом, ждать завершения всех запросов и выполнять операции с объединенными данными из всех запросов.Конечно, длина массива документов неизвестна.

Пример кода:

const documents = [
  {
    'documentId': 1,
    'documentName': 'DocName1'
  },
  {
    'documentId': 2,
    'documentName': 'DocName2'
  },
];

function saveDocumentService(document) {
  document['someAnotherData'] = '123';

  return of(document); // in real world http.post()
}

const documentsToSave$ = zip(
  documents,
  interval(200),
  document => {
    document['someDataToBeInserted'] = {'data': 123};
    return saveDocumentService(document);
  }
);

const sub = forkJoin(documentsToSave$).subscribe(documents => {
  console.log('All documents uploaded', documents); // array of responses
});

При таком подходе выводится только последнее значение.

Спасибо.

Ответы [ 4 ]

2 голосов
/ 27 сентября 2019

Вы можете использовать merge с таймером и игнорировать вывод указанного таймера:

import { of, from, timer } from 'rxjs'
import { concatMap, merge, ignoreElements, tap, toArray } from 'rxjs/operators'

const documents = [
  {
    'documentId': 1,
    'documentName': 'DocName1'
  },
  {
    'documentId': 2,
    'documentName': 'DocName2'
  },
];

function saveDocumentService(document) {
  document['someAnotherData'] = '123';
  return of(document)
}
from(documents)
  .pipe(
    concatMap(url => saveDocumentService(url).pipe(
      tap(res => console.log('Saved document...')),
      merge(timer(1000).pipe(ignoreElements()))
    )),
    toArray(),
  )
  .subscribe(documents => {
    console.log('Sub:', documents)
  })

Stackblitz

1 голос
/ 28 сентября 2019

Поскольку у вас есть documentsToSave$ типа Observable<Observable<any>>, замените forkJoin на mergeAll и toArray

const sub = documentsToSave$.pipe(mergeAll(), toArray()).subscribe(documents => {
  console.log('All documents uploaded', documents); // array of responses
});
  }

, вот рабочий пример

0 голосов
/ 27 сентября 2019

Попробуйте использовать combineLatest.Но имейте в виду, что comblatest не будет выдавать начальное значение, пока каждое наблюдаемое не выдаст хотя бы одно значение.Вы можете просмотреть документацию для получения дополнительной информации: объединитьПоследний

0 голосов
/ 27 сентября 2019

Вы можете использовать оператор rxjs pipe для объединения take и interval как

counter: number = 0;
items: string[] = ["one", "tow", "three", "four"];

  ngOnInit() {
    interval(2000)
      .pipe(take(this.items.length))
      .subscribe(res => {
        console.log(console.log(this.items[this.counter++]));
      });
  }

Редактировать: Вам необходимо создать наблюдаемый поток, используя Observable.create где вы можете использовать setInterval для создания наблюдаемых через равные промежутки времени, а в конце вы можете пометить его как завершенный.

После подписки у нас будут отдельные методы, выполняемые после выдачи нового значения и посленаблюдаемое помечено как завершенное

obs: Observable<any>;
counter: number = 0;
items: string[] = ["one", "two", "three", "four"];



ngOnInit() {
    this.obs = Observable.create(observer => {
      let intervalID = setInterval(() => {
        observer.next(this.items[this.counter++]);
        if (this.counter >= this.items.length) {
          clearInterval(intervalID);
          observer.complete();
        }
      }, 1000);
    });

    this.obs.subscribe(
      res => {
        console.log(res);
      },
      err => {
        console.log(`Error: ${err}`);
      },
      () => {
        console.log("complete");
      }
    );
  }

Stackblitz в: https://stackblitz.com/edit/angular-regular-interval-observables-with-complete

...