Я использую пример фрагмента для Amazon Athena просто для проверки вставки некоторых данных. Я не могу сказать, почему это не работает, и журналы CloudWatch не показывают никакого вывода, когда выполнение инструкции завершено. Даже когда я изменяю его на простое выражение select, я не вижу никакого вывода. Я знаю, что запрос, база данных и таблица в порядке, потому что когда я тестирую их с помощью редактора запросов Athena, он выполняется без проблем.
module.exports.dlr = async event => {
let awsFileCreds = {
accessKeyId: "XXX",
secretAccessKey: "XXX"
};
let creds = new AWS.Credentials(awsFileCreds);
AWS.config.credentials = creds;
let client = new AWS.Athena({ region: "eu-west-1" });
let q = Queue((id, cb) => {
startPolling(id)
.then(data => {
return cb(null, data);
})
.catch(err => {
console.log("Failed to poll query: ", err);
return cb(err);
});
}, 5);
const sql = "INSERT INTO delivery_receipts (status, eventid, mcc, mnc, msgcount, msisdn, received, userreference) VALUES ('TestDLR', 345345, 4353, '5345435', 234, '345754', 234, '8833')"
makeQuery(sql)
.then(data => {
console.log("Row Count: ", data.length);
console.log("DATA: ", data);
})
.catch(e => {
console.log("ERROR: ", e);
});
function makeQuery(sql) {
return new Promise((resolve, reject) => {
let params = {
QueryString: sql,
ResultConfiguration: { OutputLocation: ATHENA_OUTPUT_LOCATION },
QueryExecutionContext: { Database: ATHENA_DB }
};
client.startQueryExecution(params, (err, results) => {
if (err) return reject(err);
q.push(results.QueryExecutionId, (err, qid) => {
if (err) return reject(err);
return buildResults(qid)
.then(data => {
return resolve(data);
})
.catch(err => {
return reject(err);
});
});
});
});
}
function buildResults(query_id, max, page) {
let max_num_results = max ? max : RESULT_SIZE;
let page_token = page ? page : undefined;
return new Promise((resolve, reject) => {
let params = {
QueryExecutionId: query_id,
MaxResults: max_num_results,
NextToken: page_token
};
let dataBlob = [];
go(params);
function go(param) {
getResults(param)
.then(res => {
dataBlob = _.concat(dataBlob, res.list);
if (res.next) {
param.NextToken = res.next;
return go(param);
} else return resolve(dataBlob);
})
.catch(err => {
return reject(err);
});
}
function getResults() {
return new Promise((resolve, reject) => {
client.getQueryResults(params, (err, data) => {
if (err) return reject(err);
var list = [];
let header = buildHeader(
data.ResultSet.ResultSetMetadata.ColumnInfo
);
let top_row = _.map(_.head(data.ResultSet.Rows).Data, n => {
return n.VarCharValue;
});
let resultSet =
_.difference(header, top_row).length > 0
? data.ResultSet.Rows
: _.drop(data.ResultSet.Rows);
resultSet.forEach(item => {
list.push(
_.zipObject(
header,
_.map(item.Data, n => {
return n.VarCharValue;
})
)
);
});
return resolve({
next: "NextToken" in data ? data.NextToken : undefined,
list: list
});
});
});
}
});
}
function startPolling(id) {
return new Promise((resolve, reject) => {
function poll(id) {
client.getQueryExecution({ QueryExecutionId: id }, (err, data) => {
if (err) return reject(err);
if (data.QueryExecution.Status.State === "SUCCEEDED")
return resolve(id);
else if (
["FAILED", "CANCELLED"].includes(data.QueryExecution.Status.State)
)
return reject(
new Error(`Query ${data.QueryExecution.Status.State}`)
);
else {
setTimeout(poll, POLL_INTERVAL, id);
}
});
}
poll(id);
});
}
function buildHeader(columns) {
return _.map(columns, i => {
return i.Name;
});
}
return { message: 'Go Serverless v1.0! Your function executed successfully!', event };
};