Angular RX JS Опрос из вложенных наблюдаемых - PullRequest
1 голос
/ 19 января 2020

У меня есть служба, которую распознаватель использует для генерации и возврата отчета.

При первоначальном получении службы вызывается конечная точка REST /report, которая запускает рабочую работу на сервере, так как отчет интенсивно использует процессор и занимает более 30 секунд. Конечная точка report возвращает идентификатор рабочего задания.

Затем мне нужно опросить конечную точку REST рабочего задания /job/job_id с соответствующим идентификатором для задания. Я продолжаю опрос до тех пор, пока он не вернет состояние «завершено» и содержит готовый отчет.

Затем этот окончательный результат возвращается из службы, и распознаватель использует его.

Я не смог чтобы это работало с опросом. Я передаю ответ в начальную конечную точку отчета в switchMap, а затем использую интервал для повторного опроса каждые 500 мс конечной точки /job/job_id. Затем я пытаюсь переключить карту ответа на запрос и вернуться, если она завершена. Я впервые использую switchMap и опросы, поэтому я не уверен, правильно ли я это использую.

Вот моя последняя попытка кода:

getDepartmentReport() {

  return this.http
    .get<any>(reportUrl, this.getAuthOptions(true))
    .pipe(switchMap(initialResponse => {

      interval(500).pipe(
        switchMap(() => {
          return this.http.get<any>(workerUrl + initialResponse.id, this.getAuthOptions(true))
            .pipe(
              switchMap(pollResponse => {
                if(pollResponse.state === 'completed') {
                  return pollResponse;
                }
            })
      }));
  }));
}

Это на самом деле не скомпилируется. Это выдает следующую ошибку:

Argument of type '(initialResponse: any) => void' is not assignable to parameter of type '(value: any, index: number) => ObservableInput<any>'.
  Type 'void' is not assignable to type 'ObservableInput<any>'.

56         .pipe(switchMap(initialResponse => {

Я предполагаю, что это происходит потому, что при неполных ответах на опрос не существует оператора возврата для обработки этой ситуации и возвращается пустота.

Кто-нибудь получил Любые идеи? Я в тупике.

1 Ответ

1 голос
/ 19 января 2020

Это интересная проблема.

Вы получаете эту ошибку, потому что switchMap должен вернуть Наблюдаемый . В вашем коде вы ничего не возвращаете, вы просто запускаете интервал.

Вы также должны указать свой интервал, когда прекратить опрос. Это может быть достигнуто с помощью оператора takeWhile. Чтобы разделить вещи немного больше, я создал пользовательский оператор, в котором будет происходить опрос. Делая так, вы можете использовать этот оператор и в других местах.

Вот мой подход:

// ===== Server =====

let crtReportId = 1;
let crtReportStatus: { status: string, id: number };

const getReportFromBE = () => {
  let initialId = crtReportId;

  crtReportStatus = { status: 'pending', id: initialId };

  // It takes some time...
  timer(2000)
    .subscribe(() => crtReportStatus = { status: 'completed', id: initialId })

  return of(crtReportId++);
}

const getWorkerStatus = id => of(crtReportStatus);

// ===== Client =====

type CustomPollOperator = (data: any, cond: (d) => boolean, ms: number) => Observable<any>

const pollFor: CustomPollOperator = (data, cond, ms) => {
  let shouldPoll = true;

  return interval(ms)
    .pipe(
      tap(() => console.warn('pooling', shouldPoll)),
      takeWhile(() => shouldPoll),
      switchMap(() => getWorkerStatus(data)),
      tap(res => {
        if (cond(res)) {
          shouldPoll = false;
        }
      })
    )
}

const isWorkerCompleted = w => w.status === 'completed';

const getReports = () => {
  return getReportFromBE()
    .pipe(
      switchMap(workerId => pollFor(workerId,isWorkerCompleted, 200))
    )
}

getReports().subscribe((res) => console.log('result', res))

StackBlitz .

...