Реализуете сложный сценарий загрузки файлов с помощью конвейерных операторов Rx Js? - PullRequest
2 голосов
/ 10 февраля 2020

У меня есть сценарий, в котором мне нужно последовательно загружать серию файлов на веб-сервер, включая фрагменты для каждого файла. Процесс загрузки довольно прост, если он реализован процедурно, и выглядит примерно так:

foreach file in files:
  const result = await createMultipartUpload();

  do {
    const chunk = takeChunk();
    const result = await uploadChunk(chunk);

    chunksRemain = doChunksRemain();
  } while (chunksRemain);

  await completeUpload();

То есть для каждого процесса последовательной загрузки выполняется один запрос на создание, несколько фрагментов загружаются последовательно, но я не знать, сколько чанков будет загружено - и тогда будет выполнен один запрос на завершение.

Мне нужно преобразовать это в реактивный код с помощью Rx Js. То, что у меня есть, на самом деле работает для загрузок с одним чанком, где нужен только один чанк. Но я не выяснил, как динамически генерировать наблюдаемые фрагменты загрузки и помещать их в некоторый массив, из которого они затем могут быть извлечены.

Вот что у меня есть:

  // Contains single request. 
  public createMultipartUpload(file: File): Observable<File> {
    return this.filesHttp.store(file);
  }

  // Contains two HTTP requests. Generates a presigned URL, then uploads the chunk with the presaged URL.
  public uploadChunk(file: File): Observable<File> {
    let chunk: Blob;

    // Determine the bytes of the file to take and assign to chunk.
    if (file.originalFile.size < this.chunkSize) {
      chunk = file.originalFile;
    } else {
      chunk = file.originalFile.slice(file.currentByteOffset, file.currentByteOffset + this.chunkSize);
    }

    return this.filesHttp.getPresignedUrlForUpload(file)
      .pipe(
        concatMap(res => {
          return this.filesHttp.upload(res.uri, chunk);
        }),
        map(f => {
          // Set next chunk to take
          f.currentByteOffset += this.chunkSize;
          return f;
        })
      );
  }

  public completeUpload(file: File): Observable<MultipartCompletionResponse> {
    return this.filesHttp.complete(file);
  }

  from(this.files)
    .pipe(
      concatMap(f => this.createMultipartUpload(f)),
      switchMap(f => this.uploadChunk(f)),
      concatMap(f => this.completeUpload(f))
    ).subscribe(output => {
      // done?
    });

Как я могу последовательно загружать все чанки, а не только первый, с учетом этого сценария?

1 Ответ

2 голосов
/ 11 февраля 2020

Используйте expand для обработки динамического c количества наблюдаемых. expand рекурсивно отображается в Observable и получает его вывод в качестве следующего ввода. Завершите эту рекурсию возвратом EMPTY.

// Upload mutiple files one after another with concat
concat(...this.files.map(file => uploadFile(file))).subscribe(console.log);

// Upload single file in multiple chunks with expand
uploadFile(file): Observable<any> {
  return createMultipartUpload(file).pipe(   
    // add this for a do...while logic where doChunksRemain() shouldn't be called for 
    // the first uploadChunk()
    //switchMap(f => uploadChunk(f)),

    // Upload next chunks until doChunksRemain() returns false
    expand(f => doChunksRemain() ? uploadChunk(f) : EMPTY),
    // only take the result of the last uploaded chunk
    last(),
    // map to completeUpload when we receive the last result
    switchMap(f => completeUpload())
  );
}

https://stackblitz.com/edit/rxjs-d5vpyf

...