Мне нужна небольшая проверка работоспособности.
Я использую Node 11.10.1
У меня есть процесс, который читает из базы данных oracle с использованием библиотеки nodejs oracledb , Существует функция потоковой передачи, в которой я выбираю * и передаю результаты в виде пакетов по 10 тыс. Объектов. Затем я отправляю эти данные в индексатор через https. Поток объекта внедряется в функцию конвейера.
Некоторое время я использовал следующий код. Я пытаюсь отладить пропускную способность. Иногда я вижу около 2 тыс. Документов в секунду, обрабатываемых по этому конвейеру. В большинстве случаев я вижу <150. Прежде чем перейти к отладке моего сервера индексов. Я хочу убедиться, что эти функции закодированы правильно. </p>
async function streamReindex(databaseStream) {
let pipeline = util.promisify(stream.pipeline)
await pipeline(
selectStream,// "oracledb": "^4.0.0", stream function
camelize.camelizeStream(), //"camelize2": "^1.0.0", library wrapped in ,"through2": "^3.0.1" library to make it an object stream
JSONStream.stringify(), //"JSONStream": "^1.3.5"
reindexClient.streamReindex(core)
)
}
// reindexClient code.
function streamReindex(core) {
const updateUrl = baseUrl + core + '/update'
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
'auth': `${user.username}:${user.password}`,
}
let postStream = https.request(updateUrl, options, (res) => {
let response = {
status: {
code: res.statusCode,
message: res.statusMessage
},
headers: res.headers,
}
if (res.statusCode !== 200) {
postStream.destroy(new Error(JSON.stringify(response)))
}
})
postStream.on('error', (err)=>{
throw new Error(err)
})
postStream.on('socket', (socket) => {
socket.setKeepAlive(true, 240000)
})
return postStream
}
async function selectStream(sql, bindings = [], fetchSize =
fetchArraySize) {
let connection = await knex.client.acquireConnection()
log.info(`Fetch size is set to ${fetchSize}`)
let select = connection.queryStream(sql, bindings, {
fetchArraySize: fetchSize,
outFormat: outFormat
})
select.on('error', (err) => {
log.error('Oracle Error Event', err)
knex.client.releaseConnection(connection)
})
select.on('close', () => {
log.info('Oracle Close Event')
knex.client.releaseConnection(connection)
select = null
connection = null
})
return select
}
Если я удаляю функцию reindexClient.streamReindex (core) из конвейера. Я вижу пропускную способность ~ 5 тыс. Объектов в секунду. Я изучал функциональность highwatermark потоков, но, похоже, не могу понять, как применить его к postStream. Если я отправляю сообщение из console.log, он также не говорит, что находится в объектном режиме. Это означает, что его верхняя метка в байтах имеет низкий порог, я считаю.
Если вам понадобится дополнительная информация, я постараюсь предоставить как можно больше.