Я использую рецепт NestJS CQRS для управления взаимодействиями между двумя объектами: User и UserProfile.Архитектура представляет собой сервер API NestJS шлюза + сервер NestJS для каждого микросервиса (User, UserProfile и т. Д.).
Я уже установил базовые взаимодействия через модули User и UserProfile в API Gateway со своими собственными sagas /события / команды:
- Когда создается пользователь , создается профиль пользователя
- Когда профиль пользователя создание завершается неудачно, ранее создано пользователь удален
Подробно:
В пользовательском модуль , CreateUser команда вызывает событие UserCreated , которое перехватывается пользователем saga , что вызывает команду CreateUserProfile (из модуля UserProfile ).
Если последнее не удается, событие UserProfileFailedToCreate вызывается и перехватывается UserProfile saga , что вызывает команду DeleteUser (от пользователя модуль ).
Evвсе работает нормально.
Если команда CreateUser не выполнена, I resolve(Promise.reject(new HttpException(error, error.status))
, которая указывает конечному пользователю, что что-то пошло не так во время создания пользователя.
Моя проблема заключается в том, что я не могу повторить такое же поведение для команды CreateUserProfile , поскольку обещание HTTP-запроса уже было разрешено из первой команды, очевидно.
Итак, мой вопрос: есть лиспособ заставить команду потерпеть неудачу, если последующая команда терпит неудачу в саге?Я понимаю, что HTTP-запрос полностью отключен от любых последующих команд, запускаемых сагой, но я хочу знать, кто-нибудь уже играл с событиями или чем-то еще, чтобы воспроизвести этот поток данных?
Одна из причинЯ использую CQRS, кроме того, что у меня гораздо более понятный код для взаимодействия данных между микросервисами, он позволяет откатывать действия с репозиториями в случае сбоя любой из связанных команд, что прекрасно работает. Но мне нужен способ указать конечному пользователю, что цепочка прошла через проблему и была откатана.
UserController.ts
@Post('createUser')
async createUser(@Body() createUserDto: CreateUserDto): Promise<{user: IAuthUser, token: string}> {
const { authUser } = await this.authService.createAuthUser(createUserDto);
// this is executed after resolve() in CreateUserCommand
return {user: authUser, token: this.authService.createAccessTokenFromUser(authUser)};
}
UserService.ts
async createAuthUser(createUserDto: CreateUserDto): Promise<{authUser: IAuthUser}> {
return await this.commandBus
.execute(new CreateAuthUserCommand(createUserDto))
.catch(error => { throw new HttpException(error, error.status); });
}
CreateUserCommand.ts
async execute(command: CreateAuthUserCommand, resolve: (value?) => void) {
const { createUserDto } = command;
const createAuthUserDto: CreateAuthUserDto = {
email: createUserDto.email,
password: createUserDto.password,
phoneNumber: createUserDto.phoneNumber,
};
try {
const user = this.publisher.mergeObjectContext(
await this.client
.send<IAuthUser>({ cmd: 'createAuthUser' }, createAuthUserDto)
.toPromise()
.then((dbUser: IAuthUser) => {
const {password, passwordConfirm, ...publicUser} = Object.assign(dbUser, createUserDto);
return new AuthUser(publicUser);
}),
);
user.notifyCreated();
user.commit();
resolve(user); // <== This makes the HTTP request return its reponse
} catch (error) {
resolve(Promise.reject(error));
}
}
UserSagas.ts
authUserCreated = (event$: EventObservable<any>): Observable<ICommand> => {
return event$
.ofType(AuthUserCreatedEvent)
.pipe(
map(event => {
const createUserProfileDto: CreateUserProfileDto = {
avatarUrl: '',
firstName: event.authUser.firstName,
lastName: event.authUser.lastName,
nationality: '',
userId: event.authUser.id,
username: event.authUser.username,
};
return new CreateUserProfileCommand(createUserProfileDto);
}),
);
}
CreateUserProfileCommand.ts
async execute(command: CreateUserProfileCommand, resolve: (value?) => void) {
const { createUserProfileDto } = command;
try {
const userProfile = this.publisher.mergeObjectContext(
await this.client
.send<IUserProfile>({ cmd: 'createUserProfile' }, createUserProfileDto)
.toPromise()
.then((dbUserProfile: IUserProfile) => new UserProfile(dbUserProfile)),
);
userProfile.notifyCreated();
userProfile.commit();
resolve(userProfile);
} catch (error) {
const userProfile = this.publisher.mergeObjectContext(new UserProfile({id: createUserProfileDto.userId} as IUserProfile));
userProfile.notifyFailedToCreate();
userProfile.commit();
resolve(Promise.reject(new HttpException(error, 500)).catch(() => {}));
}
}
UserProfileSagas.ts
userProfileFailedToCreate = (event$: EventObservable<any>): Observable<ICommand> => {
return event$
.ofType(UserProfileFailedToCreateEvent)
.pipe(
map(event => {
return new DeleteAuthUserCommand(event.userProfile);
}),
);
}
DeleteUserCommand.ts
async execute(command: DeleteAuthUserCommand, resolve: (value?) => void) {
const { deleteAuthUserDto } = command;
try {
const user = this.publisher.mergeObjectContext(
await this.client
.send<IAuthUser>({ cmd: 'deleteAuthUser' }, deleteAuthUserDto)
.toPromise()
.then(() => new AuthUser({} as IAuthUser)),
);
user.notifyDeleted();
user.commit();
resolve(user);
} catch (error) {
resolve(Promise.reject(new HttpException(error, error.status)).catch(() => {}));
}
}