Синхронно обрабатывает Observable, чтобы предотвратить слишком много тайм-аутов HTTP-запросов с использованием rxjs Angular и ngrx - PullRequest
0 голосов
/ 02 мая 2018

У меня есть следующий код. Он получает массив Patients и создает объект строк this.rows, который я показываю в таблице на frontEnd этого углового компонента 4 (я также использую rxjs 5.5).

Моя проблема в том, что свойство hasAlert каждой строки назначается вызовом hasAlerts (). В методе HasAlerts я делаю http-запрос для каждого пациента через this.dataService.fetchItems<Observation>(.

Когда имеется много пациентов, слишком много HTTP-запросов будет выполняться асинхронно, и они начнут давать сбой (тайм-аут) из HasAlerts (). Есть ли способ уменьшить это или обработать по одному Observable одновременно из hasAlerts ()?

Ниже приведены возможные способы решения этой проблемы.

  1. Я думаю, что concatMap может быть полезным, но я не уверен, как его использовать. Также следует использовать его для обработки каждого пациента / строки или попытаться объединить каждый наблюдаемый hasAlert () в список, а затем попытаться использовать concatMap для их обработки по одному?
  2. Обходной путь - это оболочка переменной PatientAlertsProcessed, которую я добавил с комментарием //, которая используется для регулирования метода hasAlertsMethod. Я ограничиваю его только 15 раз, чтобы избежать проблем с тайм-аутом. Это не идеально, потому что я не могу запустить его снова после завершения этих 15 асинхронных запросов.

Код ниже

ngOnInit(): void {
    this.store.dispatch(new patients.Load([]));

    this.patients$ = this.store.select(fromPatients.getAll); 
    var patientsAlertsProcessed =0;
    this.patients$.debounceTime(2000).map(p =>{//Emits Observable<Patient[]>s, the debounceTime is to take the last one as it builds up
      this.rows = p.map(pat => {// p = Patient[], iterate through the array of patients
        var observations=0;
        var rowX= {
          username: pat.username,
          id: pat.id,
          hasAlert:false, 
        };

        if (patientsAlertsProcessed<15){// this is done to throttle the HasAlerts Method
          this.hasAlerts(pat).do(x => {
            observations++;
            if (observations>0)
            {
              rowX.hasAlert=true;
            }
          }).subscribe();
          patientsAlertsProcessed++;
        }
        return rowX;
      });
    }).subscribe(
       ()=> { },
       ()=> {
        this.table.recalculatePages();
      }
    );

  }
  hasAlerts(pat: Patient): Observable<Observation> {
    var obs$= this.dataService.fetchItems<Observation>(// this is making an HTTP get request 
        "Observation",
        null,
        pat.id
      ).filter(function (x){
        if (x.category.coding[0].code == "GlucoseEvent"){ 
            return true;
        }
        else{
          return false; 
        }
      }
    );
    return obs$;
  }

1 Ответ

0 голосов
/ 02 мая 2018

Вы можете попробовать что-то в этом роде.

const concurrency = 10; 
const rowsWithAlerts = [];

this.patients$.debounceTime(2000)
.switchMap(patients => Observable.from(patients)) // patients is of type Patient[]
.mergeMap(patient => {
   rowsWithAlerts.push(patient);
   this.hasAlerts(patient).do(
   hasAlertsResult => {
     // here you have hold to both the patient and the result of
     // hasAlerts call
     patient.hasAlerts = hasAlertsResult;
   }}), concurrency)
.subscribe(
   () => this.rows = rowsWithAlerts;
)

Ключевым моментом здесь является использование оператора mergeMap с уровнем параллелизма, установленным в значение, в этом примере 10, но, очевидно, может быть любым.

Это позволяет ограничить количество наблюдаемых, на которые вы одновременно подписываетесь, что в вашем случае означает ограничение количества http-звонков, которые вы делаете.

...