Как вручную заполнить наблюдаемую angular http - PullRequest
3 голосов
/ 24 января 2020

Я разрабатываю функцию параллельной загрузки файлов с возможностью удалять / отменять текущие http-вызовы. Как только все вызовы завершены / отменены, я уведомляю об этом потребителя.

Для этого я объединяю отдельные наблюдаемые http, используя forkJoin. Но в случае, если пользователь нажимает кнопку «Отмена», мне не следует ждать завершения фактического ответа HTTP.

takeUntill не будет обрабатывать это элегантно, так как будет действовать только после получения следующего значение из исходного потока http.

this.uploadFiles().subscribe(data => {
  console.warn("Upload completed now!!!!!", data);
});

uploadFiles() {
  return forkJoin(
    this.files.map(file => // returns array of observable
        this.uploadFile(file).pipe( catchError(() => of("Error re-emitted as success to prevent early exit"))))
  ).pipe(map(() => {
       // logic for some user friendly statistic
      return data;
    }));
}

Ответы [ 3 ]

2 голосов
/ 24 января 2020

Используйте takeUntil с Субъектом в качестве уведомителя для завершения Наблюдаемых. Вы можете передать идентификатор файла субъекту и использовать filter в takeUntil, чтобы только отменить загрузку файла с заданным идентификатором.

Используйте defaultIfEmpty для предоставления значения, которое указывает на отмененный запрос. Это также предотвращает немедленное завершение внешнего forkJoin после завершения внутреннего пустого запроса.

private cancelUpload$ = new Subject<number>();

uploadFiles() {
  let errorCount = 0, cancelledCount = 0, successCount = 0;
  return forkJoin(this.dummyFiles.map(file =>
    this.uploadFile(file).pipe(
      // react to CANCEL event
      map(response => response == 'CANCEL' ? ++cancelledCount : ++successCount),
      catchError(() => of(++errorCount))
    )
  )).pipe(map(() => ({ errorCount, successCount, cancelledCount })));
}

uploadFile(file: any) {
  http$.pipe(
    ...
    takeUntil(this.cancelUpload$.pipe(filter(id => id == file.id))), // cancel
    defaultIfEmpty('CANCEL'), // provide value when cancelled
  )
}

cancelUpload(file: any) {
  file.uploadStatus = "cancelled";
  this.cancelUpload$.next(file.id) // cancel action
}

https://stackblitz.com/edit/angular-zteeql-e1zacp

1 голос
/ 24 января 2020

Назначьте подписку переменной и отмените ее кнопкой:

$http: Subscription;
this.$http = this.http.post(this.uploadUrl, formData, {
      reportProgress: false
      // observe: 'events',
});

И в функции отмены:

cancelUpload(file: any) {
  file.uploadStatus = "cancelled"; 
  this.$http.unsubscribe();
}
0 голосов
/ 25 января 2020

Помимо takeUntil мы также можем использовать оператор merge или race для обработки этого сценария. Однако это не меняет базовый логи c.

Шаг 1: Создайте Subject для отдельной загрузки

file.cancelUpload$ = new Subject();

Шаг 2 : Объединить эту тему с фактическим вызовом http

слияние завершит поток, если какой-либо из наблюдаемых выдает ошибку. то есть, когда мы выдаем ошибку от субъекта cancelUpload$, запрос http будет автоматически отменен (см. вкладку «Сеть»).

return merge(file.cancelUpload$, $http.pipe(...

Шаг 3: Фактический код отмены

cancelUpload(file: any) {
  file.uploadStatus = "cancelled";
  file.cancelUpload$.error(file.uploadStatus);// implicitly subject gets completed
}

Шаг 4. Заполните тему cancelUpload$ в случае ошибки / успеха загрузки

Это обеспечит выполнение операции merge, поскольку оба потока сейчас завершено. И поэтому forkJoin получит ответ.

См. https://stackblitz.com/edit/angular-zteeql?file=src%2Fapp%2Fhttp-example.ts

  uploadFiles() {
    let errorCount = 0,cancelledCount = 0, successCount = 0;
    return forkJoin(
      this.dummyFiles
        .map(file =>
          this.uploadFile(file).pipe(
            catchError(() => of("Error re-emitted as success")) // value doesn't matter
          )
        )
    ).pipe(
      map(() => { // map would receive array of files in the order it was subscribed
        this.dummyFiles.forEach(file => {
          switch (file.uploadStatus) {
            case "success": successCount++; break;
            case "cancelled": cancelledCount++; break;
            case "error": errorCount++; break;
          }
        });
        return { errorCount, successCount, cancelledCount };
      })
    );
  }

  uploadFile(file: any) {
    const formData = new FormData();
    const binaryContent = new Blob([Array(1000).join("some random text")], {
      type: "text/plain"
    }); // dummy data to upload
    formData.append("file", binaryContent);
    const $http = this.http.post(this.uploadUrl, formData, {
      reportProgress: false
      // observe: 'events',
      // withCredentials: true
    });
    file.cancelUpload$ = new Subject();
    file.uploadStatus = "inProgress";
    return merge(
      file.cancelUpload$,
      $http.pipe(
        tap(data => {
          file.uploadStatus = "uploaded";
          file.cancelUpload$.complete();
        }),
        catchError(event => {
          file.uploadStatus = "error";
          file.cancelUpload$.complete();
          return throwError("error");
        })
      )
    );
  }

...