TL: DR : вам не нужна какая-то необычная среда, особенно если вы используете только внутрипроцессное взаимодействие, чтобы применять архитектуру CQRS.Достаточно родного EventEmitter
из модуля events
.Если вы хотите, чтобы межпроцессное взаимодействие servicebus
действительно хорошо работало.Чтобы взглянуть на пример реализации (из следующего длинного варианта ответа), вы можете погрузиться в код этого репозитория: простой узел cqrs
Давайте рассмотрим пример очень простого чатаприложение, в которое можно отправлять сообщения, если чат не закрыт, и, как и в отличие от сообщений.
Нашим основным агрегатом (или концептуально корнем агрегата) является Chat
(writeModel/domain/chat.js
):
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
Тогда у нас есть Message
агрегат (writeModel/domain/message.js
):
const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
Object.freeze({
id,
chatId,
userId,
content,
sentAt,
messageLikes,
});
Поведение для отправки сообщения может быть (writeModel/domain/chat.js
):
const invariant = require('invariant');
const { Message } = require('./message');
const Chat = ({ id, isClosed } = {}) =>
Object.freeze({
id,
isClosed,
});
const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
invariant(!chatState.isClosed, "can't post in a closed chat");
return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};
Теперь нам нужны команды (writeModel/domain/commands.js
):
const commands = {
types: {
SEND_MESSAGE: '[chat] send a message',
},
sendMessage({ chatId, userId, content, sentAt }) {
return Object.freeze({
type: commands.types.SEND_MESSAGE,
payload: {
chatId,
userId,
content,
sentAt,
},
});
},
};
module.exports = {
commands,
};
Так как мы находимся в javascript, у нас нет interface
для обеспечения абстракции, поэтому мы используем higher order functions
(writeModel/domain/getChatOfId.js
):
const { Chat } = require('./message');
const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
try {
const chatState = await getChatOfId(id);
if (typeof chatState === 'undefined') {
throw chatState;
}
return chatState;
} catch (e) {
throw new Error(`chat with id ${id} was not found`);
}
};
module.exports = {
getChatOfId,
};
(writeModel/domain/saveMessage.js
):
const { Message } = require('./message');
const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;
module.exports = {
saveMessage,
};
Теперь нам нужно реализовать наш commandHandlers
(уровень обслуживания приложений):
(writeModel/commandHandlers/handleSendMessage.js
)
const { sendMessage } = require('../domain/chat');
const handleSendMessage = ({
getChatOfId,
getNextMessageId,
saveMessage,
}) => async sendMessageCommandPayload => {
const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
const chat = await getChatOfId(chatId);
return saveMessage(
sendMessage({
chatState: chat,
messageId: getNextMessageId(),
userId,
content,
sentAt,
}),
);
};
module.exports = {
handleSendMessage,
};
Поскольку у нас нет interface
в javascript, мы используем higher order functions
для применения принципа инверсии зависимости посредством внедрения зависимостейes во время выполнения.
Затем мы можем реализовать составной корень модели записи: (`writeModel / index.js):
const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');
const SimpleNodeCQRSwriteModel = ({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
}) => {
handleCommand(
commands.types.SEND_MESSAGE,
handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
);
};
module.exports = {
SimpleNodeCQRSwriteModel,
};
Ваши commands
и command handler
не являютсяСвязанные вместе, вы можете затем обеспечить реализацию этих функций во время выполнения, например, с базой данных в памяти и узлом EventEmitter
(writeModel/infrastructure/inMemory/index.js
):
const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');
const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
const listeners = [];
const db = {
...initialDbState,
};
const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);
const updateDb = updater => {
updater();
listeners.map(listener => listener(db));
};
const saveMessageInMemory = saveMessage(async messageState => {
updateDb(() => (db.messages[messageState.id] = messageState));
});
const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);
const getNextMessageUuid = getNextMessageId(uuid);
return {
addOnDbUpdatedListener,
saveMessage: saveMessageInMemory,
getChatOfId: getChatOfIdFromMemory,
getNextMessageId: getNextMessageUuid,
};
};
module.exports = {
InMemoryRepository,
};
И нашим TestWriteModel
, связывающим еговсе вместе:
const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');
const TestWriteModel = () => {
const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
const commandEmitter = new EventEmitter();
const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
const handleCommand = (commandType, commandHandler) => {
commandEmitter.on(commandType, commandHandler);
};
return SimpleNodeCQRSwriteModel({
dispatchCommand,
handleCommand,
getChatOfId,
getNextMessageId,
saveMessage,
});
};
Вы можете погрузиться в код (с очень простым read model
) в этом репозитории: простой узел cqrs