Кукольник: перебирать CSV-файл и снимок экрана для каждой строки? - PullRequest
0 голосов
/ 30 января 2020

Я хотел бы перебрать CSV-файл и использовать puppeteer для скриншота URL для каждой строки в CSV-файле.

У меня есть следующий код, который работает нормально, но каждый запрос ожидает завершения предыдущего, поэтому для его выполнения требуются годы:

const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');

(async () => {
    const browser = await puppeteer.launch();

    const getFile = async function(rowId, path) {
        const page = await browser.newPage();
        page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
        let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
        const response = await page.goto(url, { waitUntil: 'networkidle2' });
        await page.waitFor(3000);
        const body = await page.$('body');
        await body.screenshot({
            path: path
        });
        page.close();
    };

    let fname = 'ids.csv'
    const csvPipe = fs.createReadStream(fname).pipe(csv());
    csvPipe.on('data', async (row) => {
            let id = row.ad_id;
            console.log(id);
            let path = './images/' + id + '.png';
            csvPipe.pause();
            await getFile(id, path);
            csvPipe.resume();
        }).on('end', () => {
            console.log('CSV file successfully processed');
        });
})();

Как выполнить запросы параллельно, чтобы ускорить его?

Если я удаляю строки pause() и resume(), я получаю эту ошибку каждый раз, когда запускается функция:

(node:18610) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 14)
(node:18610) UnhandledPromiseRejectionWarning: TypeError: Cannot read property 'screenshot' of null
    at getFile (/Users/me/Dropbox/Projects/scrape/index.js:29:12)
    at <anonymous>
    at process._tickCallback (internal/process/next_tick.js:189:7)

Ответы [ 3 ]

1 голос
/ 31 января 2020

Вот схема, которая запускает управляемое пользователем число операций getFile() параллельно. Вы задаете для переменной maxInFlight количество параллельных страниц, которые вы хотите запускать (что, вероятно, зависит только от использования вашей памяти или любого ограничения скорости, которое может применить Facebook). Вы должны будете решить, что установить, экспериментируя. Первоначально я установил значение 10, чтобы 10 страниц могли быть «в полете» одновременно.

Общая идея здесь заключается в том, что getFile() увеличивает / уменьшает inFlightCntr как показатель количества страницы открываются сразу, а затем csvPipe приостанавливается или возобновляется на основе этого счетчика.

const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');

(async () => {
    const browser = await puppeteer.launch();

    const maxInFlight = 10;     // set this value to control how many pages run in parallel
    let inFlightCntr = 0;
    let paused = false;

    async function getFile(rowId, path) {
        try {
            ++inFlightCntr;
            const page = await browser.newPage();
            page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
            let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
            const response = await page.goto(url, { waitUntil: 'networkidle2' });
            await page.waitFor(3000);
            const body = await page.$('body');
            await body.screenshot({
                path: path
            });
            await page.close();
        } catch(e) {
            console.log(e);
            page.close();
        } finally {
            --inFlightCntr;
        }
    }

    let fname = 'ids.csv'
    const csvPipe = fs.createReadStream(fname).pipe(csv());
    csvPipe.on('data', async (row) => {
            let id = row.ad_id;
            console.log(id);
            let path = './images/' + id + '.png';
            getFile(id, path).finally(() => {
                if (paused && inFlightCntr < maxInFlight) {
                    cvsPipe.resume();
                    paused = false;
                }
            });
            if (!paused && inFlightCntr >= maxInFlight) {
                cvsPipe.pause();
                paused = true;
            }
        }).on('end', () => {
            console.log('CSV file successfully processed');
        });
})();

Код может быть немного проще, если вы просто запустите csvPipe, чтобы собрать все строки в массив (перед обработкой любого из них). Затем вы можете использовать любое количество функций параллельного выполнения обещаний для обработки массива, одновременно контролируя, сколько из них выполняется параллельно. См. этот ответ от вчерашнего дня для ряда функций, которые позволяют вам управлять параллелизмом при параллельной обработке массива. Вот как будет выглядеть эта реализация:

const csv = require('csv-parser');
const fs = require('fs');
const puppeteer = require('puppeteer');

(async () => {
    const browser = await puppeteer.launch();

    const maxInFlight = 10;     // set this value to control how many pages run in parallel
    const fname = 'ids.csv'
    const csvPipe = fs.createReadStream(fname).pipe(csv());
    const rowIDs = [];

    async function getFile(rowId, path) {
        try {
            const page = await browser.newPage();
            page.setViewport({ width: 1000, height: 1500, deviceScaleFactor: 1 });
            let url = 'https://www.facebook.com/ads/library/?id=' + rowId;
            const response = await page.goto(url, { waitUntil: 'networkidle2' });
            await page.waitFor(3000);
            const body = await page.$('body');
            await body.screenshot({
                path: path
            });
        } catch(e) {
            console.log(e);
        } finally {
            await page.close();
        }
    }

    csvPipe.on('data', row => {
        rowIDs.push(row.ad_id);
    }).on('end', () => {
        // all rowIDs in the array now
        pMap(rowIDs, (id) => {
            let path = './images/' + id + '.png';
            return getFile(id, path);
        }, maxInFlight).then(() => {
             console.log("all items processed");     // all done now
        }).catch(err => {
             console.log(e);
        });
    });
})();


// utility function for processing an array asynchronously with 
// no more than limit items "in flight" at the same time
function pMap(array, fn, limit) {
    return new Promise(function(resolve, reject) {
        var index = 0, cnt = 0, stop = false, results = new Array(array.length);

        function run() {
            while (!stop && index < array.length && cnt < limit) {
                (function(i) {
                    ++cnt;
                    ++index;
                    fn(array[i]).then(function(data) {
                        results[i] = data;
                        --cnt;
                        // see if we are done or should run more requests
                        if (cnt === 0 && index === array.length) {
                            resolve(results);
                        } else {
                            run();
                        }
                    }, function(err) {
                        // set stop flag so no more requests will be sent
                        stop = true;
                        --cnt;
                        reject(err);
                    });
                })(index);
            }
        }
        run();
    });
}   
0 голосов
/ 31 января 2020

Если вы хорошо используете другую библиотеку, вы можете попробовать puppeteer-cluster (отказ от ответственности: я автор). Это решает именно эту проблему.

Вы ставите задачи в очередь и позволяете библиотеке позаботиться о параллелизме:

const cluster = await Cluster.launch({
    concurrency: Cluster.CONCURRENCY_PAGE, // you could also use something different (see docs)
    maxConcurrency: 4, // how many pages in parallel your system can handle
});

// setup your task
await cluster.task(async ({ page, data: { rowId, path } }) => {
    await page.goto(url);
    // ... remaining code
});

// just read everything at once and queue all jobs
let fname = 'ids.csv';
fs.createReadStream(fname).pipe(csv()).on('data',
    (row) => cluster.queue({ id: row.ad_id, path: './images/' + row.ad_id + '.png' })
);

// wait until all jobs are done and close the cluster
await cluster.idle();
await cluster.close();

Этот код устанавливает кластер с 4 работниками (4 страницы браузера) и работает на работах в очереди ({ id: ..., path: ... }).

0 голосов
/ 30 января 2020

Вы можете достичь этого с помощью Promise.all()
Шаг 1: вам нужно создать свои страницы, готовые к использованию:
const pages = await Promise.all([browser.newPage(),browser.newPage()])
Шаг 2: вы можете проанализировать ваш CSV-файл для генерации блоков URL-адресов в соответствии с количеством страниц, созданных на шаге 1.
Вы не загружаете данные, просто анализируете CSV-файл и получаете результаты.
Создайте массив результатов, который должен выглядеть следующим образом: const rows = [url1, url2, ..etc] Затем преобразуйте это в блоки в соответствии с количеством инициализированных страниц.
Примерно так:

const rowPacks = rows.reduce((acc, cur) => {
  if(!acc.length || acc[acc.length - 1].length < pages.length){
    acc.push([cur]);
    return acc;
  }

  acc[acc.length - 1].push(cur);
  return acc;
}, []);


Шаг 3: используйте сгенерированные страницы для обработки ваших данных, примерно так:

const pageDataLoad = async (page, url) => {
   await page.goto(url, { waitUntil: 'networkidle2' });
    await page.waitFor(3000);
    const body = await page.$('body');
    await body.screenshot({
        path: path
    });
} 

while(rowPacks.length){
  const packToUse = rowPacks.splice(0, 1);

  const passedRowsToPages = pages.map((p, pageIndex) => pageDataLoad(p, packToUse[pageIndex]));

  await Promise.all(passedRowsToPages);
}

Просто поиграйтесь с таймаутами и количеством экземпляров страниц, чтобы предотвратить DDOS-атаку на целевые URL-адреса и предотвратить проблемы с расширением памяти.

...