DynamoDB: обработка исключения ProvisionedThroughputExceededException без увеличения емкости чтения / записи - PullRequest
1 голос
/ 09 июля 2020

Есть ли хороший способ обработать ProvisionedThroughputExceededException без увеличения емкости чтения / записи таблицы? В идеале я бы хотел уменьшить пропускную способность на стороне сценария, чтобы исключение не запускалось.

function saveDataToDynamodb(data, scriptName){
    let items = [];
    var processItemsCallback = function (err, data) {
      if (err) {
        console.log(err);
        setTimeout(() => {console.log('sleep for error')}, Math.floor(0.5 * 1000 + 1000));
      }else {
        if (Object.keys(data.UnprocessedItems).length > 0) {
          //setTimeout(() => {console.log('sleep to wait')}, Math.floor(Math.random() * 1000 + 1000));
          var params = {};
          params.RequestItems = data.UnprocessedItems;
          docClient.batchWrite(params, processItemsCallback);
        }
      }
    };
    data.forEach(station => {
      let stationName = station.stationName
      if (stationName == null) {
        stationName = "null"
      }
      //console.log(station.stationId + "_" + stationName)
      let pollutants = station.pollutants
      let unique_set = new Set()
      let aqiArr = [];
      let sortedKey;
      let timestamp;
      pollutants.forEach(pollutant => {
        timestamp = pollutant.utcMoment
        sortedKey = station.stationId + "_" + timestamp + "_" + station.latitude + "_" + station.longitude + "_" + pollutant.pollutant;
        if (!unique_set.has(sortedKey)) { 
          unique_set.add(sortedKey)
          if (!scriptName.includes("Pollen")) { // calculate AQI if it's not pollen data
            let polValue = processPollutant(pollutant.value, pollutant.pollutant);
            let pollutantAQI;
            if (pollutant.pollutant == "o3" && pollutant.avgInterval == "1h") {
              pollutantAQI = aqiCalc.calcEpaAqi(polValue, pollutant.pollutant, true);
            } else {
              pollutantAQI = aqiCalc.calcEpaAqi(polValue, pollutant.pollutant, false);
            }
            //console.log("polValue: " + polValue + ", Pollutant value: " + pollutant.value + ", pollutant: " + pollutant.pollutant + ", AQI: " + pollutantAQI)
            aqiArr.push(pollutantAQI);
          }
          items.push({
            PutRequest: {
              Item: {
                 "stationId": station.stationId,
                 "stationName": String(station.stationName),
                 "latitude": station.latitude,
                 "longitude": station.longitude,
                 "pollutant": pollutant.pollutant,
                 "avgInterval": pollutant.avgInterval,
                 "units": pollutant.units,
                 "initialUnits": pollutant.initialUnits,
                 "value": pollutant.value,
                 "sortedKey": sortedKey,
                 "timestamp": timestamp
              }
            }
          });
          if (items.length >= 24) {
             let params = {
               RequestItems: {
                 pollutantFullData: items
               }
             };
             docClient.batchWrite(params, processItemsCallback);
             items = []
           }
        }
      });
      if (aqiArr.length > 0) {
        var finalAqi = getMaxAsInteger(aqiArr);
        if (finalAqi > 500) {
          finalAqi = 500;
        }
        items.push({
          PutRequest: {
            Item: {
              "stationId": station.stationId,
              "stationName": String(station.stationName),
              "latitude": station.latitude,
              "longitude": station.longitude,
              "pollutant": "aqi",
              "avgInterval": "1h",
              "units": "ppb",
              "initialUnits": "ppm",
              "value": finalAqi,
              "sortedKey": station.stationId + "_" + timestamp + "_" + station.latitude + "_" + station.longitude + "_aqi",
              "timestamp": timestamp
            }
          }
       });
      }
      if (items.length > 0) {
        let params = {
         RequestItems: {
           pollutantFullData: items
         }
       };
       docClient.batchWrite(params, processItemsCallback);
      }
     });
  }

Кажется, что операция batchWrite выполняется асинхронно, так что различные потоки попадают в конечную точку API одновременно, что вызывает ошибку. Как можно изменить предыдущий код для устранения ошибки?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...