Итак, у меня есть процесс, который выбирает из таблицы. Я делю мой выбор программно на 20 подвыборов. Затем я go через каждый из них выбираю и передаю его данные клиенту индексации (solr). Каждая выбранная память перепрыгивает и удерживается до тех пор, пока я не получу OOM.
Я вошел в систему, когда каждый запрос сработал, и это можно увидеть на следующих графиках:
Они коррелируют с каждым скачком в этом графике памяти:
14 из 20 запросов, выполненных до того, как я пришел.
Я вижу такое поведение с кодом, который похож, но с дельтой, которая запускается каждые 15 минут. Каждая дельта содержит некоторую память, пока в конечном итоге сервер не обработает sh с помощью OOM (который восстанавливает)
Я пытался отследить проблемы с прошлым дельты, но сдался и просто создал способ изящно перезапустить. Чего мне здесь не хватает?
Вот вся моя цепочка кода, которая делает эту работу ... Я знаю, что многое нужно просмотреть, но я подумал, что как можно больше подробностей поможет.
Стек библиотеки:
"node": "~11.10.1"
"knex": "^0.20.9",
"oracledb": "^4.0.0"
"camelize2": "^1.0.0"
Knex - фабрика соединений с БД
'use strict'
const objection = require('objection')
const knex = require('knex')
module.exports = function ObjectionFactory(log) {
class MyObjection extends objection.Model {
constructor() {
super()
}
static get tableName() {
return ''
}
}
MyObjection.pickJsonSchemaProperties = true
log.info('Connecting to Oracle Pluggable...', {
host: 'myHost',
username: 'myUser',
database: 'myDatabase"
})
const knexInstance = knex({
client: 'oracledb',
connection: 'connectionInfo',
pool: {
min: 0,
max: 10
},
acquireConnectionTimeout: 10000
})
process.once('SIGINT', () => {
log.info('Disconnecting from Oracle Pluggable.')
knexInstance.destroy()
.then(() => process.exit(0))
.catch(() => process.exit(1))
})
// Shut down cleanly for nodemon
process.once('SIGUSR2', () => {
log.info('Disconnecting from Oracle Pluggable')
knexInstance.destroy()
.then(() => process.kill(process.pid, 'SIGUSR2'))
.catch(() => process.kill(process.pid, 'SIGUSR2'))
})
const knexBoundClass = MyObjection.bindKnex(knexInstance)
knexBoundClass.tag = 'Oracle Connection'
return knexBoundClass
}
Мой код выбора потока:
module.exports = function oracleStream(log, MyObjection) {
const knex = MyObjection.knex()
const fetchArraySize = 10000
const outFormat = oracledb.OBJECT
return {
selectStream
}
async function selectStream(sql, bindings = [], fetchSize = fetchArraySize) {
let connection = await knex.client.acquireConnection()
log.info(`Fetch size is set to ${fetchSize}`)
let select = connection.queryStream(sql, bindings, {
fetchArraySize: fetchSize,
outFormat: outFormat
})
select.on('error', (err) => {
log.error('Oracle Error Event', err)
knex.client.releaseConnection(connection)
})
select.on('end', () => {
log.info('Destroying the Stream')
select.destroy()
})
select.on('close', () => {
log.info('Oracle Close Event')
knex.client.releaseConnection(connection)
select = null
connection = null
})
return select
}
}
Мой код конвейера индекса / потока
async function indexJob() {
const reindexStartTime = new moment().local()
let rowCount = 0
log.info('Reindex Started at', reindexStartTime.format())
let queryNumber = 1
const partitionedQueries = ['Select * from table where 1=1', 'Select * from table where 2=2', 'Select * from table where 3=3'] //There would be 20 queries in this array
let partitionedQueriesLength = partitionedQueries.length
while (partitionedQueries.length > 0) {
let query = partitionedQueries.pop()
log.info('RUNNING Query', {
queryNumber: `${queryNumber++} of ${partitionedQueriesLength}`,
query: query
})
let databaseStream = await oracleStream.selectStream(query, [], 10000) //10k represents the oracle fetch size
databaseStream.on('data', () => {
rowCount++
})
let logEveryFiveSec = setInterval(() => {
log.info('Status: ', getReindexInfo(reindexStartTime, rowCount))
}, 5000)
try {
let pipeline = util.promisify(stream.pipeline)
await pipeline(
databaseStream,
camelizeAndStringify(),
streamReindex(core)
)
} catch (err) {
databaseStream.destroy(err)
throw new JobFailedError(err)
} finally {
databaseStream.destroy()
clearInterval(logEveryFiveSec)
}
}
}
function camelizeAndStringify() {
let first = true
const serialize = new Transform({
objectMode: true,
highWaterMark: 1000,
transform(chunk, encoding, callback) {
if (first) {
this.push('[' + JSON.stringify(camelize(chunk)))
first = false
} else {
this.push(',' + JSON.stringify(camelize(chunk)))
}
callback()
chunk = null
},
flush(callback) {
this.push(']')
callback()
}
})
return serialize
}
function streamReindex(core) {
const updateUrl = baseUrl + core + '/update'
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
'auth': `${user.username}:${user.password}`,
}
let postStream = https.request(updateUrl, options, (res) => {
let response = {
status: {
code: res.statusCode,
message: res.statusMessage
},
headers: res.headers,
}
if (res.statusCode !== 200) {
postStream.destroy(new Error(JSON.stringify(response)))
}
})
postStream.on('error', (err)=>{
throw new Error(err)
})
postStream.on('socket', (socket) => {
socket.setKeepAlive(true, 110000)
})
return postStream
}
РЕДАКТИРОВАТЬ 1: Я попытался удалить knex из уравнения, выполнив единственное соединение с моей базой данных с библиотекой oracle. К сожалению, я все еще вижу такое же поведение. Вот как я изменил свой выбор на , а не использовать knex
async function selectStream(sql, bindings = [], fetchSize = fetchArraySize) {
const connectionInfo = {
user: info.user,
password: info.password,
connectString: info.host +'/'+info.database
}
const connection = await oracledb.getConnection(connectionInfo)
log.info('Connection was successful!')
log.info(`Fetch size is set to ${fetchSize}`)
let select = connection.queryStream(sql, bindings, {
fetchArraySize: fetchSize,
outFormat: outFormat
})
select.on('error', async (err) => {
log.error('Oracle Error Event', err)
await connection.close()
})
select.on('end', () => {
log.info('Destroying the Stream')
select.destroy()
})
select.on('close', async () => {
log.info('Oracle Close Event')
await connection.close()
})
return select
}