Nodejs: неправильный порядок обратного вызова - PullRequest
0 голосов
/ 10 мая 2018

У меня есть 3 службы nodejs, которые обмениваются сообщениями с помощью веб-сокетов.

Первый сервис отправляет события второму через веб-сокеты. Второй сервис отвечает за разработку событий и сохранение их в базе данных mongodb, после того как событие было сохранено в базе данных, второй сервис отправляет разработанное событие в третий сервис (графический интерфейс), всегда используя веб-сокеты.

Это код второго сервиса, который получает события от первого:

this.socketClient.on('NEW_EVENT', (event: MyEvent) => {
 winston.info(`[newEvent] New Event received: %s`, event.type);
 console.log('0__', event.payload);

 // Search for the right class that can handle the specific event
 for (const handler of this.handlers) {
   if (handler.isApplicable(event.type)) {
       // Handler found
       handler.handle(event).then((handledEvent) => {
          console.log('1__', handledEvent);  
          // Send the handled event to the gui
          this.webSockerService.publishNewEvent(event.type, handledEvent);
       });
       return;
  }
 }
});

Это код обработчика, который отвечает за обработку одного конкретного события (в этом случае он просто вызывает нужную службу для обработки события):

public async handle(event: Event): Promise<Component> {
  winston.info('[ComponentHandler:handle]');
  return this.componentService
               .componentStatusChanged(event.header.sender, event.header.senderType, event.payload);
}

Это код моего класса componentService (в данном конкретном случае он просто использует правильный репозиторий, отвечающий за сохранение события в базе данных)

public async componentStatusChanged(componentId: string, componentType: ComponentType, componentStatus: ComponentStatus): Promise<any> {
  return this.componentRepository
                   .componentStatusChanged(componentId, componentType, componentStatus);
}

А это код моего компонента хранилища:

public async componentStatusChanged(componentId: string, componentType: ComponentType, componentStatus: ComponentStatus): Promise<Component> {
  winston.info('[ComponentRepository::componentStatusChanged] Component %s newStatus %s', componentId, componentStatus);

  try {
    await this.fetch({ name: componentId });
    return this.update(componentId, { status: componentStatus });
  } catch (ex) {
    // Component not exists
    const newComponent: Component = {
                name: componentId,
                type: componentType,
                status: componentStatus,
                updatedAt: new Date().getTime()
     };
     return this.newComponent(newComponent);
  }
}

public async update(componentId: string, newValues: any): Promise<Component> {
  winston.info('[ComponentRepository::update] Updating component %s. New Values: %j', componentId, newValues);
  await this.component.get().update({ name: componentId }, newValues);
  return this.fetch({ name: componentId });
}

public async newComponent(component: Component): Promise<Component> {
  winston.info('[ComponentRepository::newComponent] Creating Component %j', component);
  return this.component.get().create(component);
}

public async fetch(filter: any): Promise<Component> {
  winston.info('[ComponentRepository::fetch] component %j', filter);
  return this.component.get().findOne(filter);
}

проблема в том, что второй сервис получает события в правильном порядке, но он перенаправляет их в графический интерфейс в неправильном порядке, следуя некоторым журналам:

Полезная нагрузка событий, полученная вторым компонентом: STARTED -> ANALYZING -> LISTENING -> ANALYZING -> LISTENING -> ANALYZING -> LISTENING .....

Как вы можете видеть в журналах, порядок событий, напечатанных console.log('0__...), не совпадает с событиями, напечатанными console.log('1___....)

info: [newEvent] New Event received: STATUS
0___ STARTED
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus STARTED
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"STARTED"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ ANALYZING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [newEvent] New Event received: STATUS
0___ LISTENING
info: [ComponentHandler:handle]
info: [ComponentRepository::componentStatusChanged] Component auth.srv newStatus LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"ANALYZING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::update] Updating component auth.srv. New Values: {"status":"LISTENING"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'ANALYZING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS ANALYZING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'ANALYZING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'ANALYZING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS ANALYZING
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
info: [ComponentRepository::fetch] component {"name":"auth.srv"}
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING
1____ { _id: 5af470ae8278f6320b3a00f3,
  name: 'auth.srv',
  type: 'AUTH_SERVICE',
  status: 'LISTENING',
  updatedAt: 1525969070608,
  __v: 0 }
info: [WebSocketService::publishNewEvent] type: STATUS LISTENING'

EDIT

Кажется, что-то вроде состояния гонки, когда я использую WebSockets. Я попытался проверить свои компоненты отдельно, и нет никаких проблем. Как только я использую веб-сокеты и публикую события быстро и многократно, что-то происходит в стеке обратного вызова. Что я могу сделать, чтобы обработать события в правильном порядке? Кажется, проблема в том, что я пытаюсь выполнить операцию ввода-вывода на mongodb. Спасибо

1 Ответ

0 голосов
/ 11 мая 2018

Я решил проблему с созданием очереди событий, которая обрабатывается отдельно.

Если у вас есть лучшее решение, пожалуйста, опубликуйте его здесь.

constructor() {
 setInterval(async () => {
   if (this.newEvents.length > 0) {
     await this.handleNewEvents();
   }
 }, 50);
}

this.socketClient.on('NEW_EVENT', (event: MyEvent) => {
   this.newEvents.push(event);
});


private async handleNewEvents(): Promise<any> {
  const event = this.newEvents.shift();
  await this.handleEvent(event);
  if (this.newEvents.length > 0) {
     await this.handleNewEvents();
  }
}

private async handleEvent(event: Event): Promise<any> {
   return new Promise((resolve, reject) => {
       for (const handler of this.handlers) {
           if (handler.isApplicable(event.type)) {
                handler.handle(event).then((handledEvent) => {
                  this.webSockerService.publishNewEvent(event.type, handledEvent);
                      resolve();
                  });
                  return;
               }
            }
        winston.error('Handler not found for %s ', event.type);
           reject();
        });
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...