Вызов нескольких AWS Lambdas не создает параллельные процессы - PullRequest
2 голосов
/ 17 июня 2019

Я пытаюсь вызвать несколько лямбда-функций (одна лямбда-функция, которая будет запускать отдельные параллельные процессы) из другой лямбда-функции.Сначала выполняется cron lambda, который просто запрашивает документы из db, а затем вызывает другую lambda с параметрами doc.Эта cron lambda запускается каждые пять минут и правильно запрашивает документы.Я тестировал вторую лямбду с двумя документами.Проблема в том, что каждый раз, когда вызывается вторая лямда, он обрабатывает только один документ - каждый раз, когда он обрабатывает другой, который он не обработал при предыдущем вызове:

Пример:

  • doc 1
  • doc 2

Первый вызов второй лямбды -> процесс doc 1

Второй вызов второй лямбды -> процесс doc 2

Третий вызов второй лямбды -> процесс документа 1

Четвертый вызов второй лямбда -> процесс документа 2

и т. Д.

Первый (cron) лямбда-код:

aws.config.update({
  region : env.lambdaRegion,
  accessKeyId: env.lambdaAccessKeyId,
  secretAccessKey: env.lambdaSecretAccessKey,
});

const lambda = new aws.Lambda({
  region: env.lambdaRegion,
});

exports.handler = async (event: any, context: any) => {
  context.callbackWaitsForEmptyEventLoop = false;

  return new Promise(async (resolve, reject) => {
    for (let i = 0; i < 100; i++) {
      const doc = await mongo.db.collection('docs').
        findOneAndUpdate(
          {
            status: 1,
            lambdaProcessing: null,
          },
          { $set: { lambdaProcessing: new Date() } },
          {
            sort: { processedAt: 1 },
            returnNewDocument: true,
          },
        );

      if (doc.value && doc.value._id) {
        const params = {
          FunctionName: env.lambdaName,
          InvocationType: 'Event',
          Payload: JSON.stringify({ docId: doc.value._id }),
        };

        lambda.invoke(params);
      } else {
        if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
          break;
        }
      }
    }
    resolve();
  });
};

Вторая лямбда-функция:

exports.handler = async (event: any, ctx: any) => {
  ctx.callbackWaitsForEmptyEventLoop = false;

  if (event && event.docId) {
    const doc = await mongo.db.collection('docs').findById(event.docId);
    return await processDoc(doc);
  } else {
    throw new Error('doc ID is not present.');
  }
};

Ответы [ 2 ]

0 голосов
/ 15 июля 2019

Ключом было создание нового отдельного экземпляра aws.Lambda() для каждой лямбды, которую мы хотим вызвать, затем мы должны разрешить и дождаться каждой лямбды, которую мы вызвали (массив Promieses).Это нормально, если вызванную лямбду не нужно ожидать, поэтому мы не тратим время на обработку AWS - поэтому вызванная лямбда начинает обработку, а затем разрешается, не дожидаясь своего ответа, чтобы разрешить основную (cron) лямбду.

Исправлен (cron) лямбда-обработчик:

aws.config.update({
  region : env.lambdaRegion,
  accessKeyId: env.lambdaAccessKeyId,
  secretAccessKey: env.lambdaSecretAccessKey,
});

exports.handler = async (event: any, context: any) => {
  context.callbackWaitsForEmptyEventLoop = false;

  return new Promise(async (resolve, reject) => {
    const promises: any = [];
    for (let i = 0; i < 100; i++) {
      const doc = await global['mongo'].db.collection('docs').
        findOneAndUpdate(
          {
            status: 1,
            lambdaProcessing: null,
          },
          { $set: { lambdaProcessing: new Date() } },
          {
            sort: { processedAt: 1 },
            returnNewDocument: true,
          },
        );

      if (doc.value && doc.value._id) {
        const params = {
          FunctionName: env.lambdaName,
          InvocationType: 'Event',
          Payload: JSON.stringify({ docId: doc.value._id }),
        };

        const lambda = new aws.Lambda({
          region: env.lambdaRegion,
          maxRetries: 0,
        });

        promises.push(
          new Promise((invokeResolve, invokeReject) => {
            lambda.invoke(params, (error, data) => {
              if (error) { console.error('ERROR: ', error); }
              if (data) { console.log('SUCCESS:', data); }
              // Resolve invoke promise in any case.
              invokeResolve();
            });
          }),
        );
      } else {
        if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
          break;
        }
      }
    }
    await Promise.all(promises);
    resolve();
  });
};

Второй (обработка) лямбда:

exports.handler = async (event: any, ctx: any) => {
  ctx.callbackWaitsForEmptyEventLoop = false;

  if (event && event.docId) {
    const doc = await mongo.db.collection('docs').findById(event.docId);
    processDoc(doc);
    return ctx.succeed('Completed.');
  } else {
    throw new Error('Doc ID is not present.');
  }
};

Я не знаю, есть ли лучший способ добиться этого с помощьюстрого лямбда-функции, но это работает.

0 голосов
/ 17 июня 2019

Для параллельного запуска нескольких лямбд без «некрасивого» решения cronjob я бы рекомендовал использовать пошаговые функции AWS с типом Parallel.Вы можете настроить логику в вашем serverless.yml, сами вызовы функций являются лямбда-функциями.Вы можете передать данные вторым аргументом callback.Если объем данных превышает 32 КБ, я бы порекомендовал использовать корзину / базу данных S3.

Пример serverless.yml

stepFunctions:
  stateMachines:
    test:
      name: 'test'
      definition:
        Comment: "Testing tips-like state structure"
        StartAt: GatherData
        States:
          GatherData:
            Type: Parallel
            Branches:
              -
                StartAt: GatherDataA
                States:
                  GatherDataA:
                    Type: Task
                    Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstA"
                    TimeoutSeconds: 15
                    End: true
              -
                StartAt: GatherDataB
                States:
                  GatherDataB:
                    Type: Task
                    Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstB"
                    TimeoutSeconds: 15
                    End: true
            Next: ResolveData
          ResolveData:
            Type: Task
            Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-resolveAB"
            TimeoutSeconds: 15
            End: true

Пример обработчиков

module.exports.firstA = (event, context, callback) => {
  const data = {
    id: 3,
    somethingElse: ['Hello', 'World'],
  };
  callback(null, data);
};
module.exports.firstB = (event, context, callback) => {
  const data = {
    id: 12,
    somethingElse: ['olleH', 'dlroW'],
  };
  callback(null, data);
};

module.exports.resolveAB = (event, context, callback) => {
  console.log("resolving data from a and b: ", event);
  const [dataFromA, dataFromB] = event;
  callback(null, event);
};

Подробнее см.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...