Я боролся с этой проблемой и чувствую, что у меня есть фундаментальное недоразумение.Я использую библиотеку redux-observable
в React
, которая склеивает redux
вместе с RxJS
для обработки асинхронности.Моя проблема в том, что мне нужно обработать большую загрузку, и я хочу показать прогресс при загрузке файла.
Функция uploadFileEpic
должна возвращать Observable<Action>
для работы с redux-observable
.uploadObservable
представляет рабочий процесс, который я хочу выполнить.Если я просто возвращаю uploadObservable
, загрузка работает, но я не получаю никаких действий handleUploadFileProgress
от progressSubscriber
в вызове ajax
.В идеале progressSubscriber
будет добавлять элементы к другой наблюдаемой, которую я мог бы объединить с uploadObservable
.Вы видите, что я пытаюсь использовать merge
здесь, но компилятор TypeScript жалуется, говоря, что возврат не может быть назначен на ObservableInput
.
Я продолжаю ходить кругами, так что я чувствую, что мое понимание должно быть в корне неверным.Я чувствую, что мне здесь не хватает простой RxJS
магии.Спасибо за помощь!
import { Observable, Observer, Subscriber, Subject, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { ofType } from 'redux-observable';
import { catchError, delay, map, mergeMap, tap, merge } from 'rxjs/operators';
import { apis } from '../../config';
export const enum ActionType {
InitialFileUpload
FileProgress
UploadFileSuccess
UploadFileFail
}
const handleInitialFileUpload = (file: File, timeLimit: number) => ({
type: ActionType.InitialFileUpload,
file,
timeLimit
})
const handleFileProgress = (file: File, percentComplete: number) => ({
type: ActionType.FileProgress,
file,
percentComplete
})
const handleUploadFileSuccess = (file: File, timeLimit: number) => ({
type: ActionType.UploadFileSuccess,
file,
timeLimit
})
const handleUploadFileFail = (file: File, timeLimit: number) => ({
type: ActionType.UploadFileFail,
file,
timeLimit
})
export const uploadFileEpic= action$ =>
action$.pipe(
ofType(ActionType.InitialFileUpload),
mergeMap((action: any) => {
const { file, timeLimit } = action;
const data = new FormData()
data.append('importFile', file, file.name)
data.append('timeLimit', timeLimit)
const progressSubject = new Subject();
const ajaxRequest = {
url: apis.gateway.run,
method: 'POST',
body: data,
headers: {},
progressSubscriber: Subscriber.create(
(e: ProgressEvent) => {
const percentComplete = Math.round((e.loaded / e.total) * 100)
console.log("Progress event")
progressSubject.next(handleUploadFileProgress(file, percentComplete))
}
)
}
const uploadObservable = ajax(ajaxRequest)
.pipe(
map(res => handleUploadFileSuccess(file)),
delay(SUCCESSFUL_UPLOAD_NOTIFICATION_LENGTH),
map(() => handleUploadFileRemove(file)),
catchError(error => of(handleUploadFileFail(file, error.response)))
)
return merge(uploadObservable, progressSubject)
}
)
)