Отправка сообщений CQRS в Node.js - PullRequest
0 голосов
/ 17 декабря 2018

Я хочу сделать CQRS для Node-приложения.

Я не Node, я из .NET, которая имеет отличную библиотеку MediatR, которая отправляет команды / запросы посреднику, которыйможет использоваться для разделения запросов и обработчиков.Так что это позволяет очень простой и элегантный CQRS.

В мире Node я нашел много библиотек / блогов, но они также всегда включают Event Sourcing.Я не заинтересован в ES.

Я могу хорошо моделировать команды и запросы, но что тогда?Они должны быть отправлены куда-то, в отрыве, чтобы избежать путаницы.

Из того, что я знаю о платформе Node, возможное решение - использовать шаблон наблюдателя (через библиотеку RxJs), поэтомуконтроллер может отправлять сообщения (например, запросы CQRS) наблюдателю, который затем публикует соответствующие события для подписчиков (то есть обработчиков запросов).Это разъединяет контроллеры и сервисы в DDD-подобном дизайне.Хотя я не уверен, как передать результаты обратно в контроллер.

Это как другие люди делают это?Есть ли лучший способ в Node?

1 Ответ

0 голосов
/ 18 декабря 2018

TL: DR : вам не нужна какая-то необычная среда, особенно если вы используете только внутрипроцессное взаимодействие, чтобы применять архитектуру CQRS.Достаточно родного EventEmitter из модуля events.Если вы хотите, чтобы межпроцессное взаимодействие servicebus действительно хорошо работало.Чтобы взглянуть на пример реализации (из следующего длинного варианта ответа), вы можете погрузиться в код этого репозитория: простой узел cqrs

Давайте рассмотрим пример очень простого чатаприложение, в которое можно отправлять сообщения, если чат не закрыт, и, как и в отличие от сообщений.

Нашим основным агрегатом (или концептуально корнем агрегата) является Chat (writeModel/domain/chat.js):

const Chat = ({ id, isClosed } = {}) =>
  Object.freeze({
    id,
    isClosed,
  });

Тогда у нас есть Message агрегат (writeModel/domain/message.js):

const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
  Object.freeze({
    id,
    chatId,
    userId,
    content,
    sentAt,
    messageLikes,
  });

Поведение для отправки сообщения может быть (writeModel/domain/chat.js):

const invariant = require('invariant');
const { Message } = require('./message');

const Chat = ({ id, isClosed } = {}) =>
  Object.freeze({
    id,
    isClosed,
  });

const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
  invariant(!chatState.isClosed, "can't post in a closed chat");
  return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};

Теперь нам нужны команды (writeModel/domain/commands.js):

const commands = {
  types: {
    SEND_MESSAGE: '[chat] send a message',
  },
  sendMessage({ chatId, userId, content, sentAt }) {
    return Object.freeze({
      type: commands.types.SEND_MESSAGE,
      payload: {
        chatId,
        userId,
        content,
        sentAt,
      },
    });
  },
};

module.exports = {
  commands,
};

Так как мы находимся в javascript, у нас нет interface для обеспечения абстракции, поэтому мы используем higher order functions (writeModel/domain/getChatOfId.js):

const { Chat } = require('./message');

const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
  try {
    const chatState = await getChatOfId(id);
    if (typeof chatState === 'undefined') {
      throw chatState;
    }
    return chatState;
  } catch (e) {
    throw new Error(`chat with id ${id} was not found`);
  }
};

module.exports = {
  getChatOfId,
};

(writeModel/domain/saveMessage.js):

const { Message } = require('./message');

const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;

module.exports = {
  saveMessage,
};

Теперь нам нужно реализовать наш commandHandlers (уровень обслуживания приложений):

(writeModel/commandHandlers/handleSendMessage.js)

const { sendMessage } = require('../domain/chat');

const handleSendMessage = ({
  getChatOfId,
  getNextMessageId,
  saveMessage,
}) => async sendMessageCommandPayload => {
  const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
  const chat = await getChatOfId(chatId);
  return saveMessage(
    sendMessage({
      chatState: chat,
      messageId: getNextMessageId(),
      userId,
      content,
      sentAt,
    }),
  );
};

module.exports = {
  handleSendMessage,
};

Поскольку у нас нет interface в javascript, мы используем higher order functions для применения принципа инверсии зависимости посредством внедрения зависимостейes во время выполнения.

Затем мы можем реализовать составной корень модели записи: (`writeModel / index.js):

const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');

const SimpleNodeCQRSwriteModel = ({
  dispatchCommand,
  handleCommand,
  getChatOfId,
  getNextMessageId,
  saveMessage,
}) => {
  handleCommand(
    commands.types.SEND_MESSAGE,
    handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
  );
};

module.exports = {
  SimpleNodeCQRSwriteModel,
};

Ваши commands и command handler не являютсяСвязанные вместе, вы можете затем обеспечить реализацию этих функций во время выполнения, например, с базой данных в памяти и узлом EventEmitter (writeModel/infrastructure/inMemory/index.js):

const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');

const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
  const listeners = [];

  const db = {
    ...initialDbState,
  };

  const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);

  const updateDb = updater => {
    updater();
    listeners.map(listener => listener(db));
  };

  const saveMessageInMemory = saveMessage(async messageState => {
    updateDb(() => (db.messages[messageState.id] = messageState));
  });

  const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);

  const getNextMessageUuid = getNextMessageId(uuid);

  return {
    addOnDbUpdatedListener,
    saveMessage: saveMessageInMemory,
    getChatOfId: getChatOfIdFromMemory,
    getNextMessageId: getNextMessageUuid,
  };
};

module.exports = {
  InMemoryRepository,
};

И нашим TestWriteModel, связывающим еговсе вместе:

const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');

const TestWriteModel = () => {
  const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
  const commandEmitter = new EventEmitter();
  const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
  const handleCommand = (commandType, commandHandler) => {
    commandEmitter.on(commandType, commandHandler);
  };
  return SimpleNodeCQRSwriteModel({
    dispatchCommand,
    handleCommand,
    getChatOfId,
    getNextMessageId,
    saveMessage,
  });
};

Вы можете погрузиться в код (с очень простым read model) в этом репозитории: простой узел cqrs

...