Привет Stackoverflow. Я годами использую stackoverflow, чтобы найти ответы, и это моя первая попытка задать вопрос самостоятельно. Поэтому не стесняйтесь говорить мне, если я делаю это неправильно.
В настоящее время я занимаюсь разработкой аналитической системы данных на основе микросервисной архитектуры. Предполагается, что эта система будет состоять из десятка самодостаточных микросервисов, взаимодействующих друг с другом с помощью RabbitMQ. Каждый из них инкапсулирован в docker-контейнере, и вся система работает от Docker-Swarm в производственном процессе.
В частности, каждый микросервис является приложением node.js и связанной базой данных, связанной с некоторым интерфейсом ORM. Его задача - управлять данными и обслуживать их в режиме CRUD, а также предоставлять результаты некоторых подготовленных запросов на основе содержащихся данных. Ничего необычного.
Для обеспечения связи микросервис-микросервис я предполагаю использовать amqplib . Но способ его использования пока неизвестен.
Мой текущий вопрос состоит в том, как использовать amqplib ООП для связи межсервисной коммуникационной сети с объектными функциональными возможностями приложения? Под ООП я имею в виду возможность замены amqplib (и самого RabbitMQ) в долгосрочной перспективе без необходимости вносить изменения в логику, связанную с данными.
То, что я действительно ищу, это пример работающей в данный моментмикросервисное приложение с использованием AMQP. Я был бы очень признателен, если бы кто-нибудь дал ссылку на него.
И вторая часть моего вопроса. Имеет ли смысл создавать микросервисное приложение, основанное на принципах, управляемых событиями, и просто передавать сообщения из RabbitMQ в основную очередь событий приложения? Таким образом, каждая процедура будет вызываться одинаково, несмотря на то, что это внутреннее или внешнее событие.
Что касается абстрактного примера одиночного микросервиса: допустим, у меня есть служба событий и прослушиватель, подключенный кэтот сервис:
class UserManager {
constructor(eventService) {
this.eventService = eventService;
this.eventServce.on("users.user.create-request", (payload) => {
User.create(payload); // User interface is omitted in this example
}
}
}
const eventService = new EventEmmiter();
const userManager = new UserManager(eventService);
С другой стороны, у меня есть соединение RabbitMQ, которое ждет сообщений:
const amqp = require('amqplib');
amqp.connect('amqp-service-in-docker').then(connection => {
connection.createChannel().then(channel => {
// Here we use topic type of exchange to be able to filter only related messages
channel.assertExchange('some-exchange', 'topic');
channel.assertQueue('').then(queue => {
// And here we are waiting only the related messages
channel.bind(queue.queue, 'some-exchange', 'users.*');
channel.consume(queue.queue, message => {
// And here is the crucial part
}
}
}
}
В настоящее время я думаю только о том, чтобы просто проанализировать иперешлите это сообщение в eventService и используйте его ключ маршрутизации в качестве имени события, например:
channel.consume(query.query, message => {
const eventName = message.fields.routingKey;
const eventPayload = JSON.parse(message.content.toString());
eventService.emit(eventName, eventPayload);
}
Но как насчет RPC? Должен ли я сделать другой обмен или даже канал для них с другим подходом, что-то вроде:
// In RPC channel
channel.consume(query.query, message => {
eventService.once('users.user.create-response', response => {
const recipient = message.properites.replyTo;
const correlationId = msg.properties.correlationId;
// Send response to specified recipient
channel.sendToQueue(
recipient,
Buffer.from(JSON.stringify(resonse)),
{
correlationId: correlationId
}
);
channel.ack(message);
});
// Same thing
const eventName = message.fields.routingKey;
const eventPayload = JSON.parse(message.content.toString());
eventService.emit(eventName, eventPayload);
}
И тогда мой класс User должен запускать событие 'users.user.create-response' каждый раз, когда создает новыйпользователь. Разве это не костыль?