BatchWrite в AWS Динамо дБ пропуская некоторые элементы - PullRequest
0 голосов
/ 08 февраля 2020

Я пытаюсь записать элементы в AWS динамо-базу данных с использованием узла SDK. Проблема, с которой я сталкиваюсь, заключается в том, что когда я записываю элементы пакета в AWS в параллельно , используя потоки, некоторые элементы не записываются в базу данных. Количество написанных предметов случайное. Например, если я выполню свой код 3 раза, то в один раз это будет 150, затем - 200, а в третий раз - 135. Кроме того, когда я пишу элементы последовательно без потоков, даже тогда некоторые элементы не написано. Однако в этом случае элементы меньше отсутствуют. Например, если общее количество элементов равно 300, то записанных элементов - 298. Я исследовал проблему, чтобы увидеть, есть ли какие-либо необработанные элементы, но метод batchWrite ничего не возвращает. Это означает, что все предметы обрабатываются правильно. Обратите внимание, что у меня есть OnDemand Provision для моей соответствующей базы данных, поэтому я не ожидаю каких-либо проблем регулирования. Так вот мой код.

 exports.run = async function() {

  **This is the function which runs first !!!!!**

  const data = await getArrayOfObjects();
  console.log("TOTAL PRICE CHANGES")  
  console.log(data.length)
  const batchesOfData = makeBatches(data)
  const threads = new Set();
  console.log("**********")
  console.log(batchesOfData.length)
  console.log("**********")
  for(let i = 0; i < batchesOfData.length; i++) {
    console.log("BATCH!!!!!")
    console.log(i)
    console.log(batchesOfData[i].length)  
    // Sequential Approach
    const response = await compensationHelper.createItems(batchesOfData[i])
    console.log("RESPONSE")
    console.log(response)

    Parallel approach
    // const workerResult = await runService(batchesOfData[i])
    // console.log("WORKER RESUULT!!!!")
    // console.log(workerResult);

  }
}

exports.updateItemsInBatch = async function(data, tableName) {
  console.log("WRITING DATA")
  console.log(data.length)
  const batchItems = {
    RequestItems: {},
  };

  batchItems.RequestItems[tableName] = data;
  try {
    const result = await documentClient.batchWrite(batchItems).promise();
    console.log("UNPROCESSED ITEMS")
    console.log(result)
    if (result instanceof Error) {
      console.log(`[Error]: ${JSON.stringify(Error)}`);
      throw new Error(result);
    }
    return Promise.resolve(true);
  } catch (err) {
    console.error(`[Error]: ${JSON.stringify(err.message)}`);
    return Promise.reject(new Error(err));
  }
};

exports.convertToAWSCompatibleFormat = function(data) {
  const awsCompatibleData = [];
  data.forEach(record => awsCompatibleData.push({ PutRequest: { Item: record } }));
  return awsCompatibleData;
};

const createItems = async function(itemList) {
  try {
    const objectsList = [];
    for (let index = 0; index < itemList.length; index++) {
      try {
        const itemListObj = itemList[index];
        const ObjToBeInserted = {
          // some data assignments here
        };

        objectsList.push(ObjToBeInserted);
        if (
          objectsList.length >= AWS_BATCH_SIZE ||
          index === itemList.length - 1
        ) {
            const awsCompatiableFormat = convertToAWSCompatibleFormat(
              objectsList
            );
            await updateItemsInBatch(
              awsCompatiableFormat,
              process.env.myTableName
            );
        }
      } catch (error) {
        console.log(`[Error]: ${JSON.stringify(error)}`);
      }
    }

    return Promise.resolve(true);
  } catch (err) {
    return Promise.reject(new Error(err));
  }
};

const makeBatches = products => {
  const productBatches = [];
  let countr = -1;
  for (let index = 0; index < products.length; index++) {
    if (index % AWS_BATCH_SIZE === 0) {
      countr++;
      productBatches[countr] = [];
      if (countr === MAX_BATCHES) {
        break;
      }
    }
    try {
      productBatches[countr].push(products[index]);
    } catch (error) {
      continue;
    }
  }
  return productBatches;
};

async function runService(workerData) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(path.join(__dirname, './worker.js'), { workerData });
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0)
        reject(new Error(`Worker stopped with exit code ${code}`));
    })
  })
}

// My worker file
'use strict';

const { workerData, parentPort } = require('worker_threads')
const creatItems = require('myscripts')
// You can do any heavy stuff here, in a synchronous way
// without blocking the "main thread"
console.log("I AM A NEW THREAD")
createItems(workerData)
// console.log('Going to write tons of content on file '+workerData);
parentPort.postMessage({ fileName: workerData, status: 'Done' })

Ответы [ 3 ]

0 голосов
/ 08 февраля 2020

Я не вижу, где вы проверяете ответ для UnprocessedItems. Пакетные операции часто возвращают список элементов, которые он не обрабатывал. Как задокументировано, BatchWriteItem"может записывать до 16 МБ данных, которые могут содержать до 25 запросов на установку или удаление."

0 голосов
/ 14 февраля 2020

У меня возникла проблема с дублирующимися ключами, что означает, что у первичного и ключа сортировки были одинаковые значения в пакете, однако в моем случае эта ошибка не возвращалась из метода AWS BatchWrite, если моя метка времени была в долях секунд 2020-02-09T08:02:36.71 Это было немного удивительно. Я решил проблему, сделав так, что созданный мной (ключ сортировки) был более детализирован, как это => 2020-02-09T08:02:36.7187, что сделало его неповторяющимся.

0 голосов
/ 08 февраля 2020

Из документации boto3 :

Если выполняется одно или несколько из следующих условий, DynamoDB отклоняет всю операцию пакетной записи:

One or more tables specified in the BatchWriteItem request does not exist.
Primary key attributes specified on an item in the request do not match those in the corresponding table's primary key schema.
You try to perform multiple operations on the same item in the same BatchWriteItem request. For example, you cannot put and delete the same item in the same BatchWriteItem request.
Your request contains at least two items with identical hash and range keys (which essentially is two put operations).
There are more than 25 requests in the batch.
Any individual item in a batch exceeds 400 KB.
The total request size exceeds 16 MB.

Мне кажется, что это правда. На моей работе у нас также была проблема, что одна партия содержала 2 идентичных первичных и вторичных ключа в пакете, поэтому вся партия была отброшена. Я знаю, что это не node.js, но мы использовали это для преодоления этой проблемы.

Это batch_writer(overwrite_by_pkeys), и оно используется для перезаписи последнего вхождения того же первичного и последнего ключа в партии. Если только небольшая часть ваших данных является дубликатом данных и вам не нужно их сохранять, вы можете использовать это. НО, если вам нужно сохранить все ваши данные, я не советую вам использовать эту функцию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...