Я создал приложение для извлечения данных Node.js, которое отправляет и извлекает данные с информационных серверов наших поставщиков (разные типы данных) и в конечном итоге становится массивом объектов. Это приложение будет получать данные по расписанию и сохранять их в базе данных SQL.
На стороне SQL у меня есть sprocs, которые будут принимать уникальное имя таблицы и данные строки, будут динамически создавать таблицу и столбцы, если они не существуют (один вызов sproc), а затем будут выполнять несколько запросов для каждогообъект в массиве для добавления каждой строки данных.
Моя проблема заключается в том, что я получаю сообщение об ошибке "Не удалось подключиться к (серверу): 1433 - Не удалось подключиться (последовательность)" при запуске многих (более 10 000)запросы сразу.
Я использую Bluebird.map с параллелизмом, установленным на 150, чтобы попытаться ограничить количество открытых запросов, но это не решает мою проблему. Я согласен с тем, что это заняло время, но я хотел бы, если это возможно, запускать группы запросов, чтобы максимально повысить эффективность.
Я использовал Bluebird.each, и это, похоже, помогло решить проблему, но замедлило процесс почти в десять раз.
mssql config
const x = require( 'dotenv' ).config( { path : '../.env' } );
const dwObj = {
domain : process.env.DATABASE_DOMAIN,
user : process.env.DATABASE_USER,
password : process.env.DATABASE_PASSWORD,
server : process.env.DATABASE_SERVER,
database : process.env.DATABASE_NAME_DW,
stream : true,
requestTimeout : 60000,
options : {
encrypt : false, // Use this if you're on Microsoft Azure
},
pool : {
max : 60000
}
};
module.exports = dwObj;
executeSproc function
const mssql = require( 'mssql' );
const dwConfig = require( '../config/datawarehouse.mssql' );
const indicatorConfig = require( '../config/indicator.mssql' );
/**
* executeSproc
*
* Executes a sproc ??. Based on a pre-defined query,
* known as a "stored procedure", build a request with a
* set of known parameters to execute on one of the
* DBs. An engineer may find out what parameters belong to
* what procedure, but this is not presently documented.
* When you ask for a procedure to be created, the
* one who creates the procedure will also outline the
* requirements for that sproc.
*
* ! TODO : refactor for error handling.
* ! TODO : optimize request pooling.
*/
module.exports = function executeSproc( sprocInfo, onRow ) {
return new Promise( async ( resolve, reject ) => {
const { database, parameters, procedure } = sprocInfo;
const results = [];
let config;
/**
* Intelligently pick which config to use based on the
* provided `sprocInfo`. This allows each individual
* controller to manage its own state and we don't have
* to worry about threads overriding env variables
* mid-flight.
*/
if ( database === 'indicator' ) {
config = indicatorConfig;
} else if ( database === 'datawarehouse' ) {
config = dwConfig;
}
const pool = await new mssql.ConnectionPool( config ).connect();
const request = await pool.request();
request.stream = true;
request.on( 'row', ( row ) => {
let newRow = row;
if ( onRow ) {
newRow = onRow( row );
}
results.push( newRow );
} );
request.on( 'error', ( error ) => {
console.error( error );
console.log( sprocInfo );
reject( error );
} );
request.on( 'done', () => {
pool.close();
return resolve( results );
} );
parameters.forEach( ( { name, value } ) =>
request.input( name, mssql.NVarChar, value )
);
request.execute( procedure );
} );
};
Функция зацикливания данных
const executeSproc = require( '../helpers/executeSproc' );
const prepareTableSave = require( '../helpers/prepareTableSave' );
const insertTableColumns = require( '../helpers/insertTableColumns' );
const bluebird = require( 'bluebird' );
const cleanseAndStringifyForSQLJSON = ( rowObj ) => {
const dataObj = Object.keys( rowObj ).reduce( ( acc, cur ) => {
const propData = rowObj[cur];
acc[ cur ] = propData.replace( /'+/g, '' );
return acc;
}, {} ) || {};
return JSON.stringify( dataObj );
};
module.exports = async ( name, data, incremental ) => {
if ( data && data[0] ) {
await insertTableColumns( name, data[0] );
if ( !incremental ) {
await prepareTableSave( name );
}
const sprocInfos = data.map( ( dataRow ) => {
return {
database : 'datawarehouse',
procedure : 'DataFetch.dbo.sp0001saveRowData',
parameters : [
{ name : 'dataExtractName', value : name },
{ name : 'dataRow', value : cleanseAndStringifyForSQLJSON( dataRow ) }
],
};
} );
// return columns;
// Loop through sprocs and execute them
return await bluebird.map( sprocInfos, sprocInfo => executeSproc( sprocInfo ), { concurrency : 150 } );
// return sprocInfos[0];
}
};