Невозможно приостановить пул при потоковой передаче данных с узлом mysql - PullRequest
0 голосов
/ 23 апреля 2020

Я использую узел и пакет mysql для потоковой передачи данных с узла на клиент.

Идея в том,

определить пул и запросы на основе пула.

Затем передайте потоковые строки в массив.

Если длина этого массива достигает длины, приостановите поток, обработайте строки, отправьте их клиенту через веб-сокеты.

Возобновить поток. Повторяйте, пока не останется никаких других рядов.

Я слежу за примерами на странице mysql npm, но получаю pool.pause is not a function

Вот код

var pool  = mysql.createPool({
  connectionLimit : 100,
  host            : config.host,
  user            : config.user,
  password        : config.password,
  database        : config.database
});

//turn simple queries to promises
const query = (str, ar) => {
  return new Promise(function(resolve, reject) {
    pool.query(str, ar, function (error, results, fields) { 
      if (error) {
          return reject(error);
      }
      resolve({results, fields});
    });
  })//promise
}


const userdetails = (ws, data) => {

//do a check, unrelated to streaming
   query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
  .then((data)=>{  
     if(data.results[0].countrows > 5000){
        // if more than 5000, we stream
        // the following is based on the mysql code found in their page
        // it has no relation to the promise-based query above
        var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category])

        query.on('result', row => {  
          rowsToProcess.push(row);
          if (rowsToProcess.length >= 100) { 
            pool.pause();
            processRows();
          }
        });

        query.on('end', () => {
          processRows();
        });

       const processRows = (done) => {
         //process some data
         //send them back using websockets  
         ws.send(JSON.stringify({ data })); 
         pool.resume();  
       }
     }
  })
}

Не знаю, так ли это связано с выполнением простого запроса, обещанием или использованием пула или чего-либо еще. Это дает TypeError: pool.pause is not a function, и я не могу это исправить. Пожалуйста, совет.

Спасибо

Ответы [ 2 ]

1 голос
/ 23 апреля 2020

Попробуйте, я использовал это много раз

    const mysqlStreamQueryPromise = (queryString, params) => {
        return new Promise((resolve, reject) => {
            let streamData = connection.query(queryString,params).stream();
            let data = [];
            streamData.on('data', item => {
                streamData.pause();
                data.push(item);
                streamData.resume();
            });
            streamData.on('end', end => {
                return resolve(data);
            });
            streamData.on('error', error => {
                return reject(error);
            });
        });
    }
0 голосов
/ 23 апреля 2020

Используйте это

    var pool  = mysql.createPool({
        connectionLimit : 100,
        host            : config.host,
        user            : config.user,
        password        : config.password,
        database        : config.database
      });

      //turn simple queries to promises
      const query = (str, ar) => {
        return new Promise(function(resolve, reject) {
          pool.query(str, ar, function (error, results, fields) { 
            if (error) {
                return reject(error);
            }
            resolve({results, fields});
          });
        })//promise
      }


      const userdetails = (ws, data) => {

      //do a check, unrelated to streaming
         query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
        .then((data)=>{  
           if(data.results[0].countrows > 5000){
              // if more than 5000, we stream
              // the following is based on the mysql code found in their page
              // it has no relation to the promise-based query above
              var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category]).stream();

              query.on('result', row => {  
                rowsToProcess.push(row);
                if (rowsToProcess.length >= 100) { 
                  pool.pause();
                  processRows();
                }
              });

              query.on('end', () => {
                processRows();
              });

             const processRows = (done) => {
               //process some data
               //send them back using websockets  
               ws.send(JSON.stringify({ data })); 
               pool.resume();  
             }
           }
        })
      }
...