Эффективное считывание, обработка и вставка данных с использованием PG-Promise и PG-Query-Stream - PullRequest
0 голосов
/ 22 февраля 2019

Я хочу сделать следующее.

  1. Запросить большую таблицу с группировкой по запросу для суммирования значений.
  2. Запустите эти записи с помощью подпрограммы, чтобы добавить некоторые дополнительные данные
  3. Эффективно вставьте их в БД.

Я пытался сделать это с помощью pg-поток запросов для чтения данных в виде потока и последующего подсчета этих записей в пакеты, например, по 1000 за раз, и как только мы достигнем предела количества пакетов, мы используем pg-promise pgp.helpers.insert для вставки данных.

Проблема, с которой я столкнулся, заключается в том, что я не могу понять, как заставить поток приостановиться для завершения вставки перед продолжением.Особенно на on.end ()

Код, который я пробовал, ниже

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

const batchInsertData = (tenant, stream, records, insertColumnSet, options = {}) => {
  stream.pause()
  const t0 = performance.now()
  let query = tenant.db.$config.pgp.helpers.insert(records, insertColumnSet)

  if (options.onConflictExpression) {
    query += options.onConflictExpression
  }

  tenant.db.none(query)
    .then(() => {
      let t1 = performance.now()
      console.log('Inserted ' + records.length + ' records done in ' + ((t1 - t0) / 1000) + ' (seconds).')
      stream.resume()
    })
    .catch(error => {
      throw error
    })
}

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
  try {
    return new Promise((resolve, reject) => {
      const query = new QueryStream(sql)

      // Set options as required
      options.batchSize = parseInt(options.batchSize) || 1000
      options.onConflictExpression = options.onConflictExpression || null

      let records = []
      let batchNumber = 1
      let recordCount = 0

      let t0 = performance.now()
      tenant.db.stream(query, (stream) => {
        stream.on('data', (record) => {
          const mappedRecord = recordMapper(record)
          records.push(mappedRecord)
          recordCount++

          if (records.length === options.batchSize) {
            batchInsertData(tenant, stream, records, columnSet, options)
            records = []
            console.log(`Batch ${batchNumber} done`)
            batchNumber++
          }
        })
        stream.on('end', () => {
        // If any records are left that are not part of a batch insert here.
          if (records.length !== 0) {
            batchInsertData(tenant, stream, records, columnSet, options)
            records = []
            console.log(`Batch ${batchNumber} done`)
            batchNumber++
            console.log('Total Records: ' + recordCount)
            let t1 = performance.now()
            console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
          } else {
            console.log('Total Records: ' + recordCount)
            let t1 = performance.now()
            console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
          }
        })
        stream.on('error', (error) => {
          throw error
        })
      })
        .then(data => {
          resolve()
        })
        .catch(error => {
          console.log('ERROR:', error)
          reject(error)
        })
    })
  } catch (err) {
    throw err
  }
}

Я не уверен, является ли подход, который я пробую, лучшим.Я пробовал несколько разных вещей, основываясь на документации, которую я могу найти в pg-обещании и в стримах, но без радости.

Любая помощь / совет очень ценятся.

Спасибо

Пол

Попытка 2

Ниже приведена моя вторая попытка использовать getNextData и последовательность в соответствии со страницей импорта данных.Изо всех сил пытается определить, как подключить поток к нему, чтобы вытащить только партии данных за один раз до вставки.

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {

  try {
    // Set options as required
    options.batchSize = parseInt(options.batchSize) || 1000
    options.onConflictExpression = options.onConflictExpression || null

    const query = new QueryStream(sql)

    function getNextData(transaction, index) {
      return new Promise(async (resolve, reject) => {
        if (index < options.batchSize) {
          let count = 1
          await transaction.stream(query, async (stream) => {
            let records = []
            await tenant.db.$config.pgp.spex.stream.read.call(transaction, stream, function (streamIndex, streamData) {  
              stream.resume()
              count++
              console.log(count, streamIndex, streamData)        

              records.push(streamData[0])

              if (records.length === options.batchSize) {
                stream.pause()
                resolve(records)
              }
            }, {readChunks: true})

          })  
        }
        resolve(null)
      })
    }

    return tenant.db.tx('massive-insert', (transaction) => {
      return transaction.sequence((index) => {          
        return getNextData(transaction, index)
          .then((records) => {
            if (records > 0) {
              let query = tenant.db.$config.pgp.helpers.insert(records, columnSet)

              if (options.onConflictExpression) {
                query += options.onConflictExpression
              }

              const i0 = performance.now()
              return transaction.none(query)
                .then(() => {
                  let i1 = performance.now()
                  console.log('Inserted ' + records.length + ' records done in ' + ((i1 - i0) / 1000) + ' (seconds).')
                })
            }
          })
      })
    })
  } catch (err) {
    throw err
  }
}

1 Ответ

0 голосов
/ 25 февраля 2019

У меня есть эта работа, использующая немного другой подход, более сфокусированный на непосредственном использовании потоков, но все еще использующий pg-обещание для работы со стороной БД.

const BatchStream = require('batched-stream')
const { performance } = require('perf_hooks')
const { Transform, Writable } = require('stream')

module.exports = async (tenant, sql, columnSet, recordMapper, options = {}) => {

  try {
    // Set options as required
    options.batchSize = parseInt(options.batchSize) || 1000
    options.onConflictExpression = options.onConflictExpression || null

    const query = new tenant.lib.QueryStream(sql)

    const stream = tenant.db.client.query(query)

    return new Promise((resolve, reject) => {
      // We want to process this in batches
      const batch = new BatchStream({size : options.batchSize, objectMode: true, strictMode: false})

      // We use a write stream to insert the batch into the database
      let insertDatabase = new Writable({
        objectMode: true,
        write(records, encoding, callback) {
          (async () => {

            try {
              /*
                If we have a record mapper then do it here prior to inserting the records.
                This way is much quicker than doing it as a transform stream below by
                about 10 seconds for 100,000 records
              */
              if (recordMapper) {
                records = records.map(record => recordMapper(record))
              }

              let query = tenant.lib.pgp.helpers.insert(records, columnSet)

              if (options.onConflictExpression) {
                query += options.onConflictExpression
              }

              const i0 = performance.now()
              await tenant.db.none(query)
                .then(() => {
                  let i1 = performance.now()
                  console.log('Inserted ' + records.length + ' records in ' + ((i1 - i0) / 1000) + ' (seconds).')
                })

            } catch(e) {
              return callback(e)
            }

            callback()
          })()
        }
      })

      // Process the stream
      const t0 = performance.now()
      stream
        // Break it down into batches
        .pipe(batch)
        // Insert those batches into the database
        .pipe(insertDatabase)
        // Once we get here we are done :)
        .on('finish', () => {
          const t1 = performance.now()
          console.log('Finished insert in ' + ((t1 - t0) / 1000) + ' (seconds).')
          resolve()
        })
        .on('error', (error) => {
          reject(error)
        })

    })
  } catch (err) {
    throw err
  }
}
...