Я пытаюсь записать элементы в AWS динамо-базу данных с использованием узла SDK. Проблема, с которой я сталкиваюсь, заключается в том, что когда я записываю элементы пакета в AWS в параллельно , используя потоки, некоторые элементы не записываются в базу данных. Количество написанных предметов случайное. Например, если я выполню свой код 3 раза, то в один раз это будет 150, затем - 200, а в третий раз - 135. Кроме того, когда я пишу элементы последовательно без потоков, даже тогда некоторые элементы не написано. Однако в этом случае элементы меньше отсутствуют. Например, если общее количество элементов равно 300, то записанных элементов - 298. Я исследовал проблему, чтобы увидеть, есть ли какие-либо необработанные элементы, но метод batchWrite ничего не возвращает. Это означает, что все предметы обрабатываются правильно. Обратите внимание, что у меня есть OnDemand Provision для моей соответствующей базы данных, поэтому я не ожидаю каких-либо проблем регулирования. Так вот мой код.
exports.run = async function() {
**This is the function which runs first !!!!!**
const data = await getArrayOfObjects();
console.log("TOTAL PRICE CHANGES")
console.log(data.length)
const batchesOfData = makeBatches(data)
const threads = new Set();
console.log("**********")
console.log(batchesOfData.length)
console.log("**********")
for(let i = 0; i < batchesOfData.length; i++) {
console.log("BATCH!!!!!")
console.log(i)
console.log(batchesOfData[i].length)
// Sequential Approach
const response = await compensationHelper.createItems(batchesOfData[i])
console.log("RESPONSE")
console.log(response)
Parallel approach
// const workerResult = await runService(batchesOfData[i])
// console.log("WORKER RESUULT!!!!")
// console.log(workerResult);
}
}
exports.updateItemsInBatch = async function(data, tableName) {
console.log("WRITING DATA")
console.log(data.length)
const batchItems = {
RequestItems: {},
};
batchItems.RequestItems[tableName] = data;
try {
const result = await documentClient.batchWrite(batchItems).promise();
console.log("UNPROCESSED ITEMS")
console.log(result)
if (result instanceof Error) {
console.log(`[Error]: ${JSON.stringify(Error)}`);
throw new Error(result);
}
return Promise.resolve(true);
} catch (err) {
console.error(`[Error]: ${JSON.stringify(err.message)}`);
return Promise.reject(new Error(err));
}
};
exports.convertToAWSCompatibleFormat = function(data) {
const awsCompatibleData = [];
data.forEach(record => awsCompatibleData.push({ PutRequest: { Item: record } }));
return awsCompatibleData;
};
const createItems = async function(itemList) {
try {
const objectsList = [];
for (let index = 0; index < itemList.length; index++) {
try {
const itemListObj = itemList[index];
const ObjToBeInserted = {
// some data assignments here
};
objectsList.push(ObjToBeInserted);
if (
objectsList.length >= AWS_BATCH_SIZE ||
index === itemList.length - 1
) {
const awsCompatiableFormat = convertToAWSCompatibleFormat(
objectsList
);
await updateItemsInBatch(
awsCompatiableFormat,
process.env.myTableName
);
}
} catch (error) {
console.log(`[Error]: ${JSON.stringify(error)}`);
}
}
return Promise.resolve(true);
} catch (err) {
return Promise.reject(new Error(err));
}
};
const makeBatches = products => {
const productBatches = [];
let countr = -1;
for (let index = 0; index < products.length; index++) {
if (index % AWS_BATCH_SIZE === 0) {
countr++;
productBatches[countr] = [];
if (countr === MAX_BATCHES) {
break;
}
}
try {
productBatches[countr].push(products[index]);
} catch (error) {
continue;
}
}
return productBatches;
};
async function runService(workerData) {
return new Promise((resolve, reject) => {
const worker = new Worker(path.join(__dirname, './worker.js'), { workerData });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
})
})
}
// My worker file
'use strict';
const { workerData, parentPort } = require('worker_threads')
const creatItems = require('myscripts')
// You can do any heavy stuff here, in a synchronous way
// without blocking the "main thread"
console.log("I AM A NEW THREAD")
createItems(workerData)
// console.log('Going to write tons of content on file '+workerData);
parentPort.postMessage({ fileName: workerData, status: 'Done' })