Я хочу сделать следующее.
- Запросить большую таблицу с группировкой по запросу для суммирования значений.
- Запустите эти записи с помощью подпрограммы, чтобы добавить некоторые дополнительные данные
- Эффективно вставьте их в БД.
Я пытался сделать это с помощью pg-поток запросов для чтения данных в виде потока и последующего подсчета этих записей в пакеты, например, по 1000 за раз, и как только мы достигнем предела количества пакетов, мы используем pg-promise pgp.helpers.insert для вставки данных.
Проблема, с которой я столкнулся, заключается в том, что я не могу понять, как заставить поток приостановиться для завершения вставки перед продолжением.Особенно на on.end ()
Код, который я пробовал, ниже
const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')
const batchInsertData = (tenant, stream, records, insertColumnSet, options = {}) => {
stream.pause()
const t0 = performance.now()
let query = tenant.db.$config.pgp.helpers.insert(records, insertColumnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
tenant.db.none(query)
.then(() => {
let t1 = performance.now()
console.log('Inserted ' + records.length + ' records done in ' + ((t1 - t0) / 1000) + ' (seconds).')
stream.resume()
})
.catch(error => {
throw error
})
}
module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
return new Promise((resolve, reject) => {
const query = new QueryStream(sql)
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
let records = []
let batchNumber = 1
let recordCount = 0
let t0 = performance.now()
tenant.db.stream(query, (stream) => {
stream.on('data', (record) => {
const mappedRecord = recordMapper(record)
records.push(mappedRecord)
recordCount++
if (records.length === options.batchSize) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.log(`Batch ${batchNumber} done`)
batchNumber++
}
})
stream.on('end', () => {
// If any records are left that are not part of a batch insert here.
if (records.length !== 0) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.log(`Batch ${batchNumber} done`)
batchNumber++
console.log('Total Records: ' + recordCount)
let t1 = performance.now()
console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
} else {
console.log('Total Records: ' + recordCount)
let t1 = performance.now()
console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
}
})
stream.on('error', (error) => {
throw error
})
})
.then(data => {
resolve()
})
.catch(error => {
console.log('ERROR:', error)
reject(error)
})
})
} catch (err) {
throw err
}
}
Я не уверен, является ли подход, который я пробую, лучшим.Я пробовал несколько разных вещей, основываясь на документации, которую я могу найти в pg-обещании и в стримах, но без радости.
Любая помощь / совет очень ценятся.
Спасибо
Пол
Попытка 2
Ниже приведена моя вторая попытка использовать getNextData и последовательность в соответствии со страницей импорта данных.Изо всех сил пытается определить, как подключить поток к нему, чтобы вытащить только партии данных за один раз до вставки.
const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')
module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
const query = new QueryStream(sql)
function getNextData(transaction, index) {
return new Promise(async (resolve, reject) => {
if (index < options.batchSize) {
let count = 1
await transaction.stream(query, async (stream) => {
let records = []
await tenant.db.$config.pgp.spex.stream.read.call(transaction, stream, function (streamIndex, streamData) {
stream.resume()
count++
console.log(count, streamIndex, streamData)
records.push(streamData[0])
if (records.length === options.batchSize) {
stream.pause()
resolve(records)
}
}, {readChunks: true})
})
}
resolve(null)
})
}
return tenant.db.tx('massive-insert', (transaction) => {
return transaction.sequence((index) => {
return getNextData(transaction, index)
.then((records) => {
if (records > 0) {
let query = tenant.db.$config.pgp.helpers.insert(records, columnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
const i0 = performance.now()
return transaction.none(query)
.then(() => {
let i1 = performance.now()
console.log('Inserted ' + records.length + ' records done in ' + ((i1 - i0) / 1000) + ' (seconds).')
})
}
})
})
})
} catch (err) {
throw err
}
}