У меня есть 3 службы nodejs, которые обмениваются сообщениями с помощью веб-сокетов.
Первый сервис отправляет события второму через веб-сокеты. Второй сервис отвечает за разработку событий и сохранение их в базе данных mongodb, после того как событие было сохранено в базе данных, второй сервис отправляет разработанное событие в третий сервис (графический интерфейс), всегда используя веб-сокеты.
Это код второго сервиса, который получает события от первого:
this.socketClient.on('NEW_EVENT', (event: MyEvent) => {
winston.info(`[newEvent] New Event received: %s`, event.type);
console.log('0__', event.payload);
// Search for the right class that can handle the specific event
for (const handler of this.handlers) {
if (handler.isApplicable(event.type)) {
// Handler found
handler.handle(event).then((handledEvent) => {
console.log('1__', handledEvent);
// Send the handled event to the gui
this.webSockerService.publishNewEvent(event.type, handledEvent);
});
return;
}
}
});
Это код обработчика, который отвечает за обработку одного конкретного события (в этом случае он просто вызывает нужную службу для обработки события):
public async handle(event: Event): Promise<Component> {
winston.info('[ComponentHandler:handle]');
return this.componentService
.componentStatusChanged(event.header.sender, event.header.senderType, event.payload);
}
Это код моего класса componentService (в данном конкретном случае он просто использует правильный репозиторий, отвечающий за сохранение события в базе данных)
public async componentStatusChanged(componentId: string, componentType: ComponentType, componentStatus: ComponentStatus): Promise<any> {
return this.componentRepository
.componentStatusChanged(componentId, componentType, componentStatus);
}
А это код моего компонента хранилища:
public async componentStatusChanged(componentId: string, componentType: ComponentType, componentStatus: ComponentStatus): Promise<Component> {
winston.info('[ComponentRepository::componentStatusChanged] Component %s newStatus %s', componentId, componentStatus);
try {
await this.fetch({ name: componentId });
return this.update(componentId, { status: componentStatus });
} catch (ex) {
// Component not exists
const newComponent: Component = {
name: componentId,
type: componentType,
status: componentStatus,
updatedAt: new Date().getTime()
};
return this.newComponent(newComponent);
}
}
public async update(componentId: string, newValues: any): Promise<Component> {
winston.info('[ComponentRepository::update] Updating component %s. New Values: %j', componentId, newValues);
await this.component.get().update({ name: componentId }, newValues);
return this.fetch({ name: componentId });
}
public async newComponent(component: Component): Promise<Component> {
winston.info('[ComponentRepository::newComponent] Creating Component %j', component);
return this.component.get().create(component);
}
public async fetch(filter: any): Promise<Component> {
winston.info('[ComponentRepository::fetch] component %j', filter);
return this.component.get().findOne(filter);
}
проблема в том, что второй сервис получает события в правильном порядке, но он перенаправляет их в графический интерфейс в неправильном порядке, следуя некоторым журналам:
Полезная нагрузка событий, полученная вторым компонентом: STARTED -> ANALYZING -> LISTENING -> ANALYZING -> LISTENING -> ANALYZING -> LISTENING .....
Как вы можете видеть в журналах, порядок событий, напечатанных console.log('0__...)
, не совпадает с событиями, напечатанными console.log('1___....)
info: [newEvent] New Event received: STATUS
0___ STARTED
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus STARTED
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"STARTED"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'ANALYZING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS ANALYZING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'ANALYZING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'ANALYZING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
name: 'auth.srv',
type: 'AUTH_SERVICE',
status: 'LISTENING',
updatedAt: 1525969070608,
__v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING'
EDIT
Кажется, что-то вроде состояния гонки, когда я использую WebSockets. Я попытался проверить свои компоненты отдельно, и нет никаких проблем. Как только я использую веб-сокеты и публикую события быстро и многократно, что-то происходит в стеке обратного вызова.
Что я могу сделать, чтобы обработать события в правильном порядке? Кажется, проблема в том, что я пытаюсь выполнить операцию ввода-вывода на mongodb.
Спасибо