Я использую rxjs 6 и выполняю две асинхронные операции, где важен порядок.
У меня есть этот кусок кода, который отлично работает:
dbmsProxy.createDatastores().subscribe(() => {
UsersDAO.insert(users).subscribe(() => {
console.log('FINISHED ALL THE CHAIN');
});
});
Но когда я пытаюсь использовать concat
из rxjs
, у меня возникает проблема, потому что выполняется второйдо завершения первого:
concat([dbmsProxy.createDatastores(), UsersDAO.insert(users)]).subscribe();
Ниже методов DBMSProxy
public createDatastores(): Observable<string> {
const _this: DBMSProxy = this;
const subject = new Subject<string>();
const subscription: Subscription = UsersDAO.createDatastore().subscribe(
onSuccess,
onError,
onFinally
);
return subject;
function onSuccess(datastore: Nedb): void {
console.log(`USERS Datastore Created Successfully`);
_this.db.users = datastore;
subject.next('success');
}
function onError(err: string) {
subject.error('error');
console.error(err);
}
function onFinally() {
subject.complete();
subscription.unsubscribe();
}
}
public insertDocuments(documents: any, datastore: Nedb): Subject<any> {
const subject = new Subject<any>();
datastore.insert(documents, onInsert);
return subject;
function onInsert(err: Error, newDocuments: any) {
if (err) {
subject.error(err);
} else {
// add to the documents to insert the id just created from nedb when inserting the document
documents.forEach((document: any, ind: number) => {
document.id = newDocuments[ind]._id;
});
subject.next(documents);
}
subject.complete();
}
}
А ниже методов UsersDAO:
public static createDatastore(): Subject<Nedb | string> {
const subject = new Subject<Nedb | string>();
const datastore = new Nedb({
filename: USERS_DATASTORE_FULL_NAME,
autoload: true,
onload
});
return subject;
function onload(err: Error) {
if (err) {
subject.error(
`Error creating USERS datastore: ${err.name} - ${err.message}`
);
} else {
subject.next(datastore);
}
subject.complete();
}
}
public static insert(users: User[]): Observable<any> {
return DBMSProxy.getInstance()
.insertDocuments(users, DBMSProxy.getInstance().db.users)
.pipe(catchError((val: any) => of('Error inserting the users')));
}
Любое представление о том, что происходит, пожалуйста?