Запускайте HTTP-запросы в чанках - PullRequest
0 голосов
/ 21 марта 2020

Я хочу запустить 1 прогремевший http-запрос в настраиваемых чанках и установить настраиваемое время ожидания между чанк-запросами. Запрос основан на данных, предоставленных файлом some.csv.

Он не работает, потому что я получаю TypeError, но когда я удаляю () после f, он тоже не работает. Я был бы очень благодарен за небольшую помощь. Вероятно, самая большая проблема заключается в том, что я не совсем понимаю, как именно работают обещания, но я попробовал несколько решений и не смог добиться того, чего хочу.

Функция тайм-аута, вероятно, доставит мне еще больше головной боли поэтому я был бы признателен за любые советы для этого тоже.

Не могли бы вы помочь мне понять, почему это не работает?

Вот фрагмент:

const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');

const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];

async function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }

    return async () => { return await rp(options) };
}

async function batchRequestRunner(data) {
    const promises = [];
    for (row of data) {
        row = row.split(',');
        promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
    }
    const batches = chunk(promises, BATCH_SIZE);

    for (let batch of batches) {
        try {
            Promise.all(
                batch.map(async f => { return await f();})
            ).then((resp) => console.log(resp));
        } catch (e) {
            console.log(e);
        }
    }
}

async function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);

    await batchRequestRunner(requestData);
}

main();

Уточнение для первого комментария:

У меня есть CSV-файл, который выглядит следующим образом:

clientId,startTime
123,13:40:00
321,13:50:00

размер файла составляет ~ 100 тыс. Строк, файл содержит информацию о том, как обновить время для определенного clientId в базе данных. У меня нет доступа к базе данных, но у меня есть доступ к API, который позволяет обновлять записи в базе данных. Я не могу сделать 100 000 вызовов одновременно, потому что: моя сеть ограничена (я работаю удаленно из-за коронавируса), она потребляет много памяти, и API также может быть ограничен и может обработать sh, если я сделаю все запросы в один раз.

Чего я хочу достичь:

  • Загрузить CSV в память, преобразовать его в массив

  • Обрабатывать API запросы в чанках, например, берут первые две строки из массива, делают вызов API на основе первых двух строк, ждут 1000 мс, берут еще две строки и продолжают обработку до конца массива (файл csv)

1 Ответ

0 голосов
/ 21 марта 2020

Что ж, похоже, это несколько классический c случай, когда вы хотите обработать массив значений с некоторой асинхронной операцией, и чтобы избежать потребления слишком большого количества ресурсов или перегрузки целевого сервера, вам не нужно больше чем N запросов в полете одновременно. Это общая проблема, для которой есть готовые решения. Мое решение goto представляет собой небольшой фрагмент кода под названием mapConcurrent(). Он аналогичен array.map(), но предполагает асинхронный обратный вызов, возвращающий обещание, и вы передаете ему максимальное количество элементов, которые должны быть когда-либо в полете одновременно. Затем он возвращает вам обещание, которое преобразуется в массив результатов.

Вот mapConcurrent():

// takes an array of items and a function that returns a promise
// returns a promise that resolves to an array of results
function mapConcurrent(items, maxConcurrent, fn) {
    let index = 0;
    let inFlightCntr = 0;
    let doneCntr = 0;
    let results = new Array(items.length);
    let stop = false;

    return new Promise(function(resolve, reject) {

        function runNext() {
            let i = index;
            ++inFlightCntr;
            fn(items[index], index++).then(function(val) {
                ++doneCntr;
                --inFlightCntr;
                results[i] = val;
                run();
            }, function(err) {
                // set flag so we don't launch any more requests
                stop = true;
                reject(err);
            });
        }

        function run() {
            // launch as many as we're allowed to
            while (!stop && inflightCntr < maxConcurrent && index < items.length) {
                runNext();
            }
            // if all are done, then resolve parent promise with results
            if (doneCntr === items.length) {
                resolve(results);
            }
        }

        run();
    });
}

Ваш код может быть структурирован для использования следующим образом:

function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }
    return rp(options);
}

function processRow(row) {
    let rowData = row.split(",");
    return update(rowData[0], rowData[1], rowData[2]);
}


function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);    

    // process this entire array with up to 5 requests "in-flight" at the same time
    mapConcurrent(requestData, 5, processRow).then(results => {
        console.log(results);
    }).catch(err => {
        console.log(err);
    });
}

Очевидно, что вы можете настроить количество одновременных запросов на любое количество, которое вы хотите. Я установил его на 5 здесь в этом примере.

...