Вот схема, которая запускает управляемое пользователем число операций getFile()
параллельно. Вы задаете для переменной maxInFlight
количество параллельных страниц, которые вы хотите запускать (что, вероятно, зависит только от использования вашей памяти или любого ограничения скорости, которое может применить Facebook). Вы должны будете решить, что установить, экспериментируя. Первоначально я установил значение 10, чтобы 10 страниц могли быть «в полете» одновременно.
Общая идея здесь заключается в том, что getFile()
увеличивает / уменьшает inFlightCntr
как показатель количества страницы открываются сразу, а затем csvPipe приостанавливается или возобновляется на основе этого счетчика.
const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');
(async () => {
const browser = await puppeteer.launch();
const maxInFlight = 10; // set this value to control how many pages run in parallel
let inFlightCntr = 0;
let paused = false;
async function getFile(rowId, path) {
try {
++inFlightCntr;
const page = await browser.newPage();
page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
const response = await page.goto(url, { waitUntil: 'networkidle2' });
await page.waitFor(3000);
const body = await page.$('body');
await body.screenshot({
path: path
});
await page.close();
} catch(e) {
console.log(e);
page.close();
} finally {
--inFlightCntr;
}
}
let fname = 'ids.csv'
const csvPipe = fs.createReadStream(fname).pipe(csv());
csvPipe.on('data', async (row) => {
let id = row.ad_id;
console.log(id);
let path = './images/' + id + '.png';
getFile(id, path).finally(() => {
if (paused && inFlightCntr < maxInFlight) {
cvsPipe.resume();
paused = false;
}
});
if (!paused && inFlightCntr >= maxInFlight) {
cvsPipe.pause();
paused = true;
}
}).on('end', () => {
console.log('CSV file successfully processed');
});
})();
Код может быть немного проще, если вы просто запустите csvPipe, чтобы собрать все строки в массив (перед обработкой любого из них). Затем вы можете использовать любое количество функций параллельного выполнения обещаний для обработки массива, одновременно контролируя, сколько из них выполняется параллельно. См. этот ответ от вчерашнего дня для ряда функций, которые позволяют вам управлять параллелизмом при параллельной обработке массива. Вот как будет выглядеть эта реализация:
const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');
(async () => {
const browser = await puppeteer.launch();
const maxInFlight = 10; // set this value to control how many pages run in parallel
const fname = 'ids.csv'
const csvPipe = fs.createReadStream(fname).pipe(csv());
const rowIDs = [];
async function getFile(rowId, path) {
try {
const page = await browser.newPage();
page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
const response = await page.goto(url, { waitUntil: 'networkidle2' });
await page.waitFor(3000);
const body = await page.$('body');
await body.screenshot({
path: path
});
} catch(e) {
console.log(e);
} finally {
await page.close();
}
}
csvPipe.on('data', row => {
rowIDs.push(row.ad_id);
}).on('end', () => {
// all rowIDs in the array now
pMap(rowIDs, (id) => {
let path = './images/' + id + '.png';
return getFile(id, path);
}, maxInFlight).then(() => {
console.log("all items processed"); // all done now
}).catch(err => {
console.log(e);
});
});
})();
// utility function for processing an array asynchronously with
// no more than limit items "in flight" at the same time
function pMap(array, fn, limit) {
return new Promise(function(resolve, reject) {
var index = 0, cnt = 0, stop = false, results = new Array(array.length);
function run() {
while (!stop && index < array.length && cnt < limit) {
(function(i) {
++cnt;
++index;
fn(array[i]).then(function(data) {
results[i] = data;
--cnt;
// see if we are done or should run more requests
if (cnt === 0 && index === array.length) {
resolve(results);
} else {
run();
}
}, function(err) {
// set stop flag so no more requests will be sent
stop = true;
--cnt;
reject(err);
});
})(index);
}
}
run();
});
}