MongoDB Change Stream watcher был запущен при втором выполнении функции aws-lambda или не был запущен вообще - PullRequest
1 голос
/ 30 марта 2019

Я создаю приложение Angular PWA с сервером nodejs. После нескольких исследований я решил поместить свой внешний интерфейс на S3, а внутренний - на aws-lambda с использованием serverless-framework и соединить его с атласом Монго.

В моем бэкэнде я использую mongoose.watch() для прослушивания соответствующих изменений в моей БД. Когда происходят изменения, я использую FCM для отправки данных на устройство в соответствии с токеном.

После запуска приложения я отправляю 3-4 запроса (различные пути), чтобы получить данные из моей БД.

Так вот моя проблема:

Если я использую sls offline start --skipCacheInvalidation для локальной работы, все работает почти нормально - я получил 2 или более дублирований, поэтому каждый мой стартовый запрос начинается с функции aws-lambda.

Как только я sls deploy получаю все свои начальные данные, но как только я обновляю DB (другим запросом) в потоках журналов aws-lambda, я не вижу, что мой код достигает .on('change') слушателя. И выполнение aws-лямбда-запроса заканчивается. Иногда мой код достигает этого слушателя и печатает данные потока изменений моего предыдущего запроса.

Вот часть моего serverless.yml

provider:
  name: aws
  runtime: nodejs8.10
  stage: dev
  region: us-east-1
  memorySize: 128
  timeout: 30
  versionFunctions: false

functions:
  app:
    handler: server.handler
    events:
      - http:
          method: any
          path: /{proxy+}
          cors:
            origin: '*'
            headers:
              - Content-Type
              - X-Amz-Date
              - Authorization
              - X-Api-Key
              - X-Amz-Security-Token
              - X-Amz-User-Agent
              - x-customauthheader
            allowCredentials: false

server.js

require('dotenv').config({ path: './variables.env' });
const sls = require('serverless-http');
const app = require('./lib/app');
module.exports.handler = sls(app, { callbackWaitsForEmptyEventLoop: false });

app.js

const app = require('express')();
const bodyParser = require('body-parser');
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

// // Allow CORS support and remote requests to the service
app.use((req, res, next) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  next();
});

const { checkHeaders } = require('../bin/routes-handler');
app.use(checkHeaders);

require('./db');
const routes = require('./routes');
app.use('/api', routes);

module.exports = app;

db.js

const mongoose = require('mongoose');
mongoose.connect(
  process.env.DB,
  { useNewUrlParser: true, useCreateIndex: true, useFindAndModify: false },
);
const db = mongoose.connection;
db.on('error', console.error.bind(console, 'Connection Error:'));
db.once('open', () => {
  console.log('mongoose Connection established');
  require('../services/quote-watcher')(db);
});

и моя quote-watcher.js

const { sendDataByTokens } = require('../bin/notify-devices');

module.exports = db => {
  'use strict';
  const taskCollection = db.collection('quotes');
  const changeStream = taskCollection.watch([], { fullDocument: 'updateLookup' });
  // const changeStream = taskCollection.watch();

  changeStream.on('error', err => {
    console.log(err);
  });
  changeStream.on('change', change => {
    console.log(change);
    if (change.operationType === 'insert') {
      return sendDataByTokens(
        change.fullDocument.participants,
        change.operationType,
        change.fullDocument,
        change.ns.coll,
        null,
        { stage: change.fullDocument.currStage },
      );
    }
    if (change.operationType === 'update') {
      const { updatedFields } = change.updateDescription;
      return sendDataByTokens(
        change.fullDocument.participants,
        change.operationType,
        change.fullDocument,
        change.ns.coll,
        change.documentKey._id,
        { stage: change.fullDocument.currStage, updatedFields },
      );
    }
  });
};

...