ограничение количества параллельных запросов к cassandra db в nodejs - PullRequest
0 голосов
/ 20 ноября 2018

Я в настоящее время анализирую файл и получаю его данные, чтобы вставить их в мою базу данных.Для этого я сделал массив запросов и выполняю их через цикл.

Проблема в том, что я ограничен 2048 параллельными запросами.

Это код, который я сделал:

index.js =>

const ImportClient = require("./scripts/import_client_leasing")
const InsertDb = require("./scripts/insertDb")

const cassandra = require('cassandra-driver');
const databaseConfig = require('./config/database.json');


const authProvider = new cassandra.auth.PlainTextAuthProvider(databaseConfig.cassandra.username, databaseConfig.cassandra.password);

const db = new cassandra.Client({
    contactPoints: databaseConfig.cassandra.contactPoints,
    authProvider: authProvider
});

ImportClient.clientLeasingImport().then(queries => { // this function parse the data and return an array of query
    return InsertDb.Clients(db, queries);    //inserting in the database returns something when all the promises are done
}).then(result => {
    return db.shutdown(function (err, result) {});
}).then(result => {
    console.log(result);
}).catch(error => {
    console.log(error)
});

insertDb.js =>

module.exports = {
    Clients: function (db, queries) {
        DB = db;
        return insertClients(queries);
    }
}

function insertClients(queries) {
    return new Promise((resolve, reject) => {
        let promisesArray = [];

        for (let i = 0; i < queries.length; i++) {
            promisesArray.push(new Promise(function (resolve, reject) {
                DB.execute(queries[i], function (err, result) {
                    if (err) {
                        reject(err)
                    } else {
                        resolve("success");
                    }
                });
            }));
        }
        Promise.all(promisesArray).then((result) => {
            resolve("success");
        }).catch((error) => {
            resolve("error");
        });
    });
}

Я пробовал несколько вещей, например добавление функции ожидания, которая устанавливает тайм-аут в моем дляцикл каждые x секунд (но он не работает, потому что я уже в обещании), я также пытался с p-queue и p-limit, но, похоже, он тоже не работает.

I 'Я вроде застрял здесь, я думаю, что упускаю что-то тривиальное, но я не понимаю, что.

Спасибо

Ответы [ 2 ]

0 голосов
/ 21 февраля 2019

При отправке нескольких запросов параллельно (функция execute() использует асинхронное выполнение), вы в конечном итоге ставите в очередь на одном из разных уровней: на стороне драйвера, в сетевом стеке или на стороне сервера.Чрезмерные очереди влияют на общее время, необходимое для завершения каждой операции.Вы должны ограничить количество одновременных запросов в любое время, также известное как уровень параллелизма, чтобы получить высокую пропускную способность и низкую задержку.

Когда вы думаете о реализации этого в своем коде, вы должны рассмотреть возможность запуска фиксированного количества асинхронныхвыполнения, используя ваш уровень параллелизма в качестве ограничения и добавляя новые операции только после завершения выполнения в этом ограничении.

Вот пример того, как ограничить количество одновременных выполнений при обработке элементов в цикле: https://github.com/datastax/nodejs-driver/blob/master/examples/concurrent-executions/execute-in-loop.js

В двух словах:

// Launch in parallel n async operations (n being the concurrency level)
for (let i = 0; i < concurrencyLevel; i++) {
  promises[i] = executeOneAtATime();
}

// ...
async function executeOneAtATime() {
  // ...
  // Execute queries asynchronously in sequence
  while (counter++ < totalLength) {;
    await client.execute(query, params, options);
  }
}
0 голосов
/ 22 ноября 2018

Хорошо, поэтому я нашел обходной путь для достижения своей цели.Я записал в файл все мои запросы

const fs = require('fs')
fs.appendFileSync('my_file.cql', queries[i] + "\n");

и затем использовал

child_process.exec("cqls --file my_file", function(err, stdout, stderr){})"

, чтобы вставить в Кассандру все мои запросы

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...