Как исправить «Не удалось подключиться (последовательность)» с использованием Node mssql со многими запросами - PullRequest
0 голосов
/ 06 ноября 2019

Я создал приложение для извлечения данных Node.js, которое отправляет и извлекает данные с информационных серверов наших поставщиков (разные типы данных) и в конечном итоге становится массивом объектов. Это приложение будет получать данные по расписанию и сохранять их в базе данных SQL.

На стороне SQL у меня есть sprocs, которые будут принимать уникальное имя таблицы и данные строки, будут динамически создавать таблицу и столбцы, если они не существуют (один вызов sproc), а затем будут выполнять несколько запросов для каждогообъект в массиве для добавления каждой строки данных.

Моя проблема заключается в том, что я получаю сообщение об ошибке "Не удалось подключиться к (серверу): 1433 - Не удалось подключиться (последовательность)" при запуске многих (более 10 000)запросы сразу.

Я использую Bluebird.map с параллелизмом, установленным на 150, чтобы попытаться ограничить количество открытых запросов, но это не решает мою проблему. Я согласен с тем, что это заняло время, но я хотел бы, если это возможно, запускать группы запросов, чтобы максимально повысить эффективность.

Я использовал Bluebird.each, и это, похоже, помогло решить проблему, но замедлило процесс почти в десять раз.

mssql config

const x = require( 'dotenv' ).config( { path : '../.env' } );

const dwObj = {
    domain         : process.env.DATABASE_DOMAIN,
    user           : process.env.DATABASE_USER,
    password       : process.env.DATABASE_PASSWORD,
    server         : process.env.DATABASE_SERVER,
    database       : process.env.DATABASE_NAME_DW,
    stream         : true,
    requestTimeout : 60000,
    options        : {
        encrypt : false, // Use this if you're on Microsoft Azure
    },
    pool : {
        max : 60000
    }
};

module.exports = dwObj;

executeSproc function

const mssql = require( 'mssql' );

const dwConfig = require( '../config/datawarehouse.mssql' );
const indicatorConfig = require( '../config/indicator.mssql' );

/**
 * executeSproc
 *
 * Executes a sproc ??. Based on a pre-defined query,
 * known as a "stored procedure", build a request with a
 * set of known parameters to execute on one of the 
 * DBs. An engineer may find out what parameters belong to
 * what procedure, but this is not presently documented.
 * When you ask for a procedure to be created, the
 * one who creates the procedure will also outline the
 * requirements for that sproc.
 *
 * ! TODO : refactor for error handling.
 * ! TODO : optimize request pooling.
 */

module.exports = function executeSproc( sprocInfo, onRow ) {

    return new Promise( async ( resolve, reject ) => {
        const { database, parameters, procedure } = sprocInfo;
        const results = [];
        let config;

        /**
         * Intelligently pick which config to use based on the
         * provided `sprocInfo`. This allows each individual
         * controller to manage its own state and we don't have
         * to worry about threads overriding env variables
         * mid-flight.
         */
        if ( database === 'indicator' ) {
            config = indicatorConfig;
        } else if ( database === 'datawarehouse' ) {
            config = dwConfig;
        }

        const pool = await new mssql.ConnectionPool( config ).connect();
        const request = await pool.request();
        request.stream = true;

        request.on( 'row', ( row ) => {
            let newRow = row;

            if ( onRow ) {
                newRow = onRow( row );
            }

            results.push( newRow );
        } );

        request.on( 'error', ( error ) => {
            console.error( error );
            console.log( sprocInfo );
            reject( error );
        } );

        request.on( 'done', () => {
            pool.close();
            return resolve( results );
        } );

        parameters.forEach( ( { name, value } ) =>
            request.input( name, mssql.NVarChar, value )
        );

        request.execute( procedure );

    } );

};

Функция зацикливания данных

const executeSproc = require( '../helpers/executeSproc' );
const prepareTableSave = require( '../helpers/prepareTableSave' );
const insertTableColumns = require( '../helpers/insertTableColumns' );
const bluebird = require( 'bluebird' );

const cleanseAndStringifyForSQLJSON = ( rowObj ) => {

    const dataObj = Object.keys( rowObj ).reduce( ( acc, cur ) => {

        const propData = rowObj[cur];
        acc[ cur ] = propData.replace( /'+/g, '' );
        return acc;
    }, {} ) || {};

    return JSON.stringify( dataObj );

};

module.exports = async ( name, data, incremental ) => {

    if ( data && data[0] ) {
        await insertTableColumns( name, data[0] );

        if ( !incremental ) {
            await prepareTableSave( name );
        }

        const sprocInfos = data.map( ( dataRow ) => {

            return {
                database   : 'datawarehouse',
                procedure  : 'DataFetch.dbo.sp0001saveRowData',
                parameters : [
                    { name : 'dataExtractName', value : name },
                    { name : 'dataRow',         value : cleanseAndStringifyForSQLJSON( dataRow ) }
                ],
            };

        } );

        // return columns;
        // Loop through sprocs and execute them
        return await bluebird.map( sprocInfos, sprocInfo => executeSproc( sprocInfo ), { concurrency : 150 } );

        // return sprocInfos[0];
    }

};
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...