Есть ли хороший способ обработать ProvisionedThroughputExceededException без увеличения емкости чтения / записи таблицы? В идеале я бы хотел уменьшить пропускную способность на стороне сценария, чтобы исключение не запускалось.
function saveDataToDynamodb(data, scriptName){
let items = [];
var processItemsCallback = function (err, data) {
if (err) {
console.log(err);
setTimeout(() => {console.log('sleep for error')}, Math.floor(0.5 * 1000 + 1000));
}else {
if (Object.keys(data.UnprocessedItems).length > 0) {
//setTimeout(() => {console.log('sleep to wait')}, Math.floor(Math.random() * 1000 + 1000));
var params = {};
params.RequestItems = data.UnprocessedItems;
docClient.batchWrite(params, processItemsCallback);
}
}
};
data.forEach(station => {
let stationName = station.stationName
if (stationName == null) {
stationName = "null"
}
//console.log(station.stationId + "_" + stationName)
let pollutants = station.pollutants
let unique_set = new Set()
let aqiArr = [];
let sortedKey;
let timestamp;
pollutants.forEach(pollutant => {
timestamp = pollutant.utcMoment
sortedKey = station.stationId + "_" + timestamp + "_" + station.latitude + "_" + station.longitude + "_" + pollutant.pollutant;
if (!unique_set.has(sortedKey)) {
unique_set.add(sortedKey)
if (!scriptName.includes("Pollen")) { // calculate AQI if it's not pollen data
let polValue = processPollutant(pollutant.value, pollutant.pollutant);
let pollutantAQI;
if (pollutant.pollutant == "o3" && pollutant.avgInterval == "1h") {
pollutantAQI = aqiCalc.calcEpaAqi(polValue, pollutant.pollutant, true);
} else {
pollutantAQI = aqiCalc.calcEpaAqi(polValue, pollutant.pollutant, false);
}
//console.log("polValue: " + polValue + ", Pollutant value: " + pollutant.value + ", pollutant: " + pollutant.pollutant + ", AQI: " + pollutantAQI)
aqiArr.push(pollutantAQI);
}
items.push({
PutRequest: {
Item: {
"stationId": station.stationId,
"stationName": String(station.stationName),
"latitude": station.latitude,
"longitude": station.longitude,
"pollutant": pollutant.pollutant,
"avgInterval": pollutant.avgInterval,
"units": pollutant.units,
"initialUnits": pollutant.initialUnits,
"value": pollutant.value,
"sortedKey": sortedKey,
"timestamp": timestamp
}
}
});
if (items.length >= 24) {
let params = {
RequestItems: {
pollutantFullData: items
}
};
docClient.batchWrite(params, processItemsCallback);
items = []
}
}
});
if (aqiArr.length > 0) {
var finalAqi = getMaxAsInteger(aqiArr);
if (finalAqi > 500) {
finalAqi = 500;
}
items.push({
PutRequest: {
Item: {
"stationId": station.stationId,
"stationName": String(station.stationName),
"latitude": station.latitude,
"longitude": station.longitude,
"pollutant": "aqi",
"avgInterval": "1h",
"units": "ppb",
"initialUnits": "ppm",
"value": finalAqi,
"sortedKey": station.stationId + "_" + timestamp + "_" + station.latitude + "_" + station.longitude + "_aqi",
"timestamp": timestamp
}
}
});
}
if (items.length > 0) {
let params = {
RequestItems: {
pollutantFullData: items
}
};
docClient.batchWrite(params, processItemsCallback);
}
});
}
Кажется, что операция batchWrite выполняется асинхронно, так что различные потоки попадают в конечную точку API одновременно, что вызывает ошибку. Как можно изменить предыдущий код для устранения ошибки?