Обработчик лямбда-функции AWS не вставляется в Athena - PullRequest
0 голосов
/ 30 октября 2019

Я использую пример фрагмента для Amazon Athena просто для проверки вставки некоторых данных. Я не могу сказать, почему это не работает, и журналы CloudWatch не показывают никакого вывода, когда выполнение инструкции завершено. Даже когда я изменяю его на простое выражение select, я не вижу никакого вывода. Я знаю, что запрос, база данных и таблица в порядке, потому что когда я тестирую их с помощью редактора запросов Athena, он выполняется без проблем.

module.exports.dlr = async event => {

  let awsFileCreds = {
    accessKeyId: "XXX",
    secretAccessKey: "XXX"
  };

  let creds = new AWS.Credentials(awsFileCreds);
  AWS.config.credentials = creds;

  let client = new AWS.Athena({ region: "eu-west-1" });

  let q = Queue((id, cb) => {
    startPolling(id)
      .then(data => {
        return cb(null, data);
      })
      .catch(err => {
        console.log("Failed to poll query: ", err);
        return cb(err);
      });
  }, 5);

    const sql = "INSERT INTO delivery_receipts (status, eventid, mcc, mnc, msgcount, msisdn, received, userreference) VALUES ('TestDLR', 345345, 4353, '5345435', 234, '345754', 234, '8833')"

  makeQuery(sql)
    .then(data => {
      console.log("Row Count: ", data.length);
      console.log("DATA: ", data);
    })
    .catch(e => {
      console.log("ERROR: ", e);
    });

  function makeQuery(sql) {
    return new Promise((resolve, reject) => {
      let params = {
        QueryString: sql,
        ResultConfiguration: { OutputLocation: ATHENA_OUTPUT_LOCATION },
        QueryExecutionContext: { Database: ATHENA_DB }
      };

      client.startQueryExecution(params, (err, results) => {
        if (err) return reject(err);
        q.push(results.QueryExecutionId, (err, qid) => {
          if (err) return reject(err);
          return buildResults(qid)
            .then(data => {
              return resolve(data);
            })
            .catch(err => {
              return reject(err);
            });
        });
      });
    });
  }

  function buildResults(query_id, max, page) {
    let max_num_results = max ? max : RESULT_SIZE;
    let page_token = page ? page : undefined;
    return new Promise((resolve, reject) => {
      let params = {
        QueryExecutionId: query_id,
        MaxResults: max_num_results,
        NextToken: page_token
      };

      let dataBlob = [];
      go(params);

      function go(param) {
        getResults(param)
          .then(res => {
            dataBlob = _.concat(dataBlob, res.list);
            if (res.next) {
              param.NextToken = res.next;
              return go(param);
            } else return resolve(dataBlob);
          })
          .catch(err => {
            return reject(err);
          });
      }

      function getResults() {
        return new Promise((resolve, reject) => {
          client.getQueryResults(params, (err, data) => {
            if (err) return reject(err);
            var list = [];
            let header = buildHeader(
              data.ResultSet.ResultSetMetadata.ColumnInfo
            );
            let top_row = _.map(_.head(data.ResultSet.Rows).Data, n => {
              return n.VarCharValue;
            });
            let resultSet =
              _.difference(header, top_row).length > 0
                ? data.ResultSet.Rows
                : _.drop(data.ResultSet.Rows);
            resultSet.forEach(item => {
              list.push(
                _.zipObject(
                  header,
                  _.map(item.Data, n => {
                    return n.VarCharValue;
                  })
                )
              );
            });
            return resolve({
              next: "NextToken" in data ? data.NextToken : undefined,
              list: list
            });
          });
        });
      }
    });
  }

  function startPolling(id) {
    return new Promise((resolve, reject) => {
      function poll(id) {
        client.getQueryExecution({ QueryExecutionId: id }, (err, data) => {
          if (err) return reject(err);
          if (data.QueryExecution.Status.State === "SUCCEEDED")
            return resolve(id);
          else if (
            ["FAILED", "CANCELLED"].includes(data.QueryExecution.Status.State)
          )
            return reject(
              new Error(`Query ${data.QueryExecution.Status.State}`)
            );
          else {
            setTimeout(poll, POLL_INTERVAL, id);
          }
        });
      }
      poll(id);
    });
  }

  function buildHeader(columns) {
    return _.map(columns, i => {
      return i.Name;
    });
  }

  return { message: 'Go Serverless v1.0! Your function executed successfully!', event };
};

1 Ответ

0 голосов
/ 30 октября 2019

Разобрался. Использовать лямбда-события aws с athena легко, используя пакет athena-express . Вы можете указать свою конфигурацию и запросить базу данных athena, как обычно, используя значительно меньше кода, чем предусмотрено в примере amazon athena nodejs.

Это код, который я использовал для достижения результата:

"use strict";

const AthenaExpress = require("athena-express"),
    aws = require("aws-sdk");

const athenaExpressConfig = {
    aws,
    db: "messaging",
    getStats: true
};
const athenaExpress = new AthenaExpress(athenaExpressConfig);

exports.handler = async event => {
    const sqlQuery = "SELECT * FROM delivery_receipts LIMIT 3";

    try {
        let results = await athenaExpress.query(sqlQuery);
        return results;
    } catch (error) {
        return error;
    }
};
...