Я пытался получить извлеченные данные на основе измененных событий. Результат, который я ожидал, должен быть объектом, вместо этого я вижу наблюдаемый, поскольку выборка дает обещание.
flatMap
использовался для преобразования promise
в объект, но, похоже, я ошибался. Пожалуйста, дайте мне знать, как я могу это исправить.
/** Receives UpdateEvent */
const changes$ = new Subject<FileChangesEvent>();
/** mock for 'this' */
const context = {
notify: (...args) => console.log('[CONTEXT.NOTIFY]: ', ...args),
processAdds: (input) => of(input).pipe(delay(input.debounce || 1), map(i => Object.assign(i, ...fetch(i.url)))),
processUpdates: (input) => of(input).pipe(delay(input.debounce || 1), map(i => Object.assign(i, ...fetch(i.url)))),
processDeletes: (input) => of(input).pipe(delay(input.debounce || 1), map(i => Object.assign(i, { modified: 'fetchUpdates' }))),
};
/** Process FilesChange event */
const stream$ = changes$
.pipe(
/** Side effect for log current step */
tap(event => console.log('onFileChanges trigged, event data: ', event.changes)),
/** Fetching array of changes */
map(event => event.changes),
/** Use it for wait all 'forked' observables */
flatMap(
/** Process each record in parallel */
changes => forkJoin(
changes.map(change =>
/** Update processing */
of(change)
.pipe(
tap(input => console.log('Validating the file Type: ', input)),
/** fetching data based on update type */
flatMap(input =>
input.type === FileChangeType.ADDED
? context.processAdds(input)
: context.processUpdates(input),
),
/** remove large files from update */
map(input => (
input.type === FileChangeType.UPDATED && input.size < CONTENT_SIZE_LIMIT
? Object.assign(input, { value: '' })
: input
)),
/** side effect for pubsub notifications */
tap(updates => context.notify(updates)),
),
),
),
),
);
/** When updates stream was finished */
stream$.subscribe(
(...args) => console.log('[TICK]: ', ...args), // When all event updates was processed
(...args) => console.log('[ERROR]: ', ...args),
(...args) => console.log('[DONE]: ', ...args),
);
export const updateEventsStream = (event) => changes$.next(event);