Rxjs проблема с оператором concat при последовательном выполнении операций - PullRequest
0 голосов
/ 25 октября 2018

Я использую rxjs 6 и выполняю две асинхронные операции, где важен порядок.

У меня есть этот кусок кода, который отлично работает:

dbmsProxy.createDatastores().subscribe(() => {
    UsersDAO.insert(users).subscribe(() => {
        console.log('FINISHED ALL THE CHAIN');
    });
});

Но когда я пытаюсь использовать concat из rxjs, у меня возникает проблема, потому что выполняется второйдо завершения первого:

concat([dbmsProxy.createDatastores(), UsersDAO.insert(users)]).subscribe();

Ниже методов DBMSProxy

public createDatastores(): Observable<string> {
    const _this: DBMSProxy = this;
    const subject = new Subject<string>();
    const subscription: Subscription = UsersDAO.createDatastore().subscribe(
        onSuccess,
        onError,
        onFinally
    );
    return subject;

    function onSuccess(datastore: Nedb): void {
        console.log(`USERS Datastore Created Successfully`);
        _this.db.users = datastore;
        subject.next('success');
    }

    function onError(err: string) {
        subject.error('error');
        console.error(err);
    }

    function onFinally() {
        subject.complete();
        subscription.unsubscribe();
    }
}

public insertDocuments(documents: any, datastore: Nedb): Subject<any> {
    const subject = new Subject<any>();
    datastore.insert(documents, onInsert);
    return subject;

    function onInsert(err: Error, newDocuments: any) {
        if (err) {
            subject.error(err);
        } else {
            // add to the documents to insert the id just created from nedb when inserting the document
            documents.forEach((document: any, ind: number) => {
                document.id = newDocuments[ind]._id;
            });
            subject.next(documents);
        }
        subject.complete();
    }
}

А ниже методов UsersDAO:

public static createDatastore(): Subject<Nedb | string> {
        const subject = new Subject<Nedb | string>();
        const datastore = new Nedb({
            filename: USERS_DATASTORE_FULL_NAME,
            autoload: true,
            onload
        });
        return subject;

        function onload(err: Error) {
            if (err) {
                subject.error(
                    `Error creating USERS datastore: ${err.name} - ${err.message}`
                );
            } else {
                subject.next(datastore);
            }
            subject.complete();
        }
    }

    public static insert(users: User[]): Observable<any> {
        return DBMSProxy.getInstance()
            .insertDocuments(users, DBMSProxy.getInstance().db.users)
            .pipe(catchError((val: any) => of('Error inserting the users')));
    }

Любое представление о том, что происходит, пожалуйста?

1 Ответ

0 голосов
/ 25 октября 2018

Мое текущее решение состоит в том, чтобы преобразовать Subject в Observable, создать новый Observable со вторым и убрать квадратные скобки (в противном случае я получу обратно наблюдаемые, а не результаты), и это, кажется, работает:

const operations = concat(
    dbmsProxy.createDatastores().asObservable(),
    defer(() => UsersDAO.insert(users))
);
operations.subscribe(onSubscribe);
function onSubscribe(result: any) {
    console.log('Finished all: ', result);
}
...