Как правильно реализовать rabbitMQ в микросервисном приложении node.js? - PullRequest
0 голосов
/ 29 октября 2019

Привет Stackoverflow. Я годами использую stackoverflow, чтобы найти ответы, и это моя первая попытка задать вопрос самостоятельно. Поэтому не стесняйтесь говорить мне, если я делаю это неправильно.

В настоящее время я занимаюсь разработкой аналитической системы данных на основе микросервисной архитектуры. Предполагается, что эта система будет состоять из десятка самодостаточных микросервисов, взаимодействующих друг с другом с помощью RabbitMQ. Каждый из них инкапсулирован в docker-контейнере, и вся система работает от Docker-Swarm в производственном процессе.

В частности, каждый микросервис является приложением node.js и связанной базой данных, связанной с некоторым интерфейсом ORM. Его задача - управлять данными и обслуживать их в режиме CRUD, а также предоставлять результаты некоторых подготовленных запросов на основе содержащихся данных. Ничего необычного.

Для обеспечения связи микросервис-микросервис я предполагаю использовать amqplib . Но способ его использования пока неизвестен.

Мой текущий вопрос состоит в том, как использовать amqplib ООП для связи межсервисной коммуникационной сети с объектными функциональными возможностями приложения? Под ООП я имею в виду возможность замены amqplib (и самого RabbitMQ) в долгосрочной перспективе без необходимости вносить изменения в логику, связанную с данными.

То, что я действительно ищу, это пример работающей в данный моментмикросервисное приложение с использованием AMQP. Я был бы очень признателен, если бы кто-нибудь дал ссылку на него.

И вторая часть моего вопроса. Имеет ли смысл создавать микросервисное приложение, основанное на принципах, управляемых событиями, и просто передавать сообщения из RabbitMQ в основную очередь событий приложения? Таким образом, каждая процедура будет вызываться одинаково, несмотря на то, что это внутреннее или внешнее событие.

Что касается абстрактного примера одиночного микросервиса: допустим, у меня есть служба событий и прослушиватель, подключенный кэтот сервис:

class UserManager {
  constructor(eventService) {
    this.eventService = eventService;
    this.eventServce.on("users.user.create-request", (payload) => {
      User.create(payload); // User interface is omitted in this example 
    }
  }
}

const eventService = new EventEmmiter();
const userManager = new UserManager(eventService);

С другой стороны, у меня есть соединение RabbitMQ, которое ждет сообщений:

const amqp = require('amqplib');

amqp.connect('amqp-service-in-docker').then(connection => {
  connection.createChannel().then(channel => {
    // Here we use topic type of exchange to be able to filter only related messages 
    channel.assertExchange('some-exchange', 'topic'); 

    channel.assertQueue('').then(queue => {
      // And here we are waiting only the related messages 
      channel.bind(queue.queue, 'some-exchange', 'users.*');

      channel.consume(queue.queue, message => {
        // And here is the crucial part
      }
    }
  }    
}

В настоящее время я думаю только о том, чтобы просто проанализировать иперешлите это сообщение в eventService и используйте его ключ маршрутизации в качестве имени события, например:

channel.consume(query.query, message => {
  const eventName = message.fields.routingKey;
  const eventPayload = JSON.parse(message.content.toString());
  eventService.emit(eventName, eventPayload);
}

Но как насчет RPC? Должен ли я сделать другой обмен или даже канал для них с другим подходом, что-то вроде:

// In RPC channel
channel.consume(query.query, message => {
  eventService.once('users.user.create-response', response => {
    const recipient = message.properites.replyTo;
    const correlationId = msg.properties.correlationId;

    // Send response to specified recipient
    channel.sendToQueue(
      recipient,
      Buffer.from(JSON.stringify(resonse)),
      { 
        correlationId: correlationId
      }
    );

    channel.ack(message);
  });

  // Same thing
  const eventName = message.fields.routingKey;
  const eventPayload = JSON.parse(message.content.toString());
  eventService.emit(eventName, eventPayload);
}

И тогда мой класс User должен запускать событие 'users.user.create-response' каждый раз, когда создает новыйпользователь. Разве это не костыль?

...