kafka для запроса выбора DB2 не возвращает никакого результата - PullRequest
0 голосов
/ 10 июля 2020
const dbUrl = "DRIVER={DB2};".concat("DATABASE=").concat(database).concat("UID=").concat(uid).concat("PWD=").concat(dbpwd).concat("HOSTNAME=").concat(dbHostName).concat("port=").concat(port).concat(";Security=").concat(Security);

const ibmdb = require("ibm_db");

//open the connection with ibm_db
var Pool = require("ibm_db").Pool;
var pool = new Pool();
var con = pool.init(5, dbUrl);
// pool.setMaxPoolSize(1);
if (con !== true) {
  console.log("Exceeded maximum limit of 5 connections. Connection refused " + dbHostName + " failed ");
} else {
  console.log("DB connection pool initialization to " + dbHostName + " done successfully!");
}
const connectAndExecuteQuery = async (user_query, params = {}) => {
  console.log(query)
  return new Promise(function (resolve, reject) {
    pool.open(dbUrl, function (err, conn) {
      if (err) {
        console.log("Pool is open err");
        return reject("Error getting connection from pool");
      } else {
        try {
          conn.query(user_query, params, function (err, data) {
            if (err) {
              console.log(err)
              //Cleaning pool
              pool.close(function (a, b) {
              });
              return reject("Error from DB: " + err + ". Connection pool has been purged");

            } else {
              return resolve(data);
            }
          }.bind(this));
        } catch (ex) {
          return reject(ex);
        } finally {
          conn.close(function (err) {
            if (err) {
            } else {
            }
          });
        }

      }
    }.bind(this));

  });
}
processMessage(data,offset){
 let query = 
    `SELECT * FROM ` + schema + `.TableNAME as t WHERE t.column = '${value}'`;
   connectAndExecuteQuery(query).then((resultSet) => {
      console.log(resultSet)
      if (resultSet.length > 0) {}
});
}
//consumer.js
consumer.on('data', async function (data) {
  
    try {
     
      consumer.pause([topic]);
      await processMessage(data, data.offset);
      consumer.commit();
      consumer.resume([topic]);
    } catch (error) {
      console.log('data consuming error', error);
    }
  
});

** с использованием библиотеки node-rdkafka для работы с производителем и потребителем, напишите topi c для производителя, и потребитель будет читать из topi c и обрабатывать эту запись в DB2.am, выполняя запрос выбора, вызывая processMessage (), но это не результат перенастройки. можем ли мы напрямую подключиться от потребителя к базе данных? может ли кто-нибудь помочь мне с проблемой **

...