Nodejs Streams - Помогите найти утечку памяти - PullRequest
0 голосов
/ 16 апреля 2020

Итак, у меня есть процесс, который выбирает из таблицы. Я делю мой выбор программно на 20 подвыборов. Затем я go через каждый из них выбираю и передаю его данные клиенту индексации (solr). Каждая выбранная память перепрыгивает и удерживается до тех пор, пока я не получу OOM.

Я вошел в систему, когда каждый запрос сработал, и это можно увидеть на следующих графиках: Each column indicates an instance of a new query starting

Они коррелируют с каждым скачком в этом графике памяти:

Each jump is a new query

14 из 20 запросов, выполненных до того, как я пришел.

Я вижу такое поведение с кодом, который похож, но с дельтой, которая запускается каждые 15 минут. Каждая дельта содержит некоторую память, пока в конечном итоге сервер не обработает sh с помощью OOM (который восстанавливает)

Я пытался отследить проблемы с прошлым дельты, но сдался и просто создал способ изящно перезапустить. Чего мне здесь не хватает?

Вот вся моя цепочка кода, которая делает эту работу ... Я знаю, что многое нужно просмотреть, но я подумал, что как можно больше подробностей поможет.

Стек библиотеки:

"node": "~11.10.1"
"knex": "^0.20.9",
"oracledb": "^4.0.0"
"camelize2": "^1.0.0"

Knex - фабрика соединений с БД

'use strict'

const objection = require('objection')
const knex = require('knex')


module.exports = function ObjectionFactory(log) {
  class MyObjection extends objection.Model {
    constructor() {
      super()
    }
    static get tableName() {
      return ''
    }
  }

  MyObjection.pickJsonSchemaProperties = true

  log.info('Connecting to Oracle Pluggable...', {
    host: 'myHost',
    username: 'myUser',
    database: 'myDatabase"
  })

  const knexInstance = knex({
    client: 'oracledb',
    connection: 'connectionInfo',
    pool: {
      min: 0,
      max: 10
    },
    acquireConnectionTimeout: 10000
  })

  process.once('SIGINT', () => {
    log.info('Disconnecting from Oracle Pluggable.')
    knexInstance.destroy()
      .then(() => process.exit(0))
      .catch(() => process.exit(1))
  })
  // Shut down cleanly for nodemon
  process.once('SIGUSR2', () => {
    log.info('Disconnecting from Oracle Pluggable')
    knexInstance.destroy()
      .then(() => process.kill(process.pid, 'SIGUSR2'))
      .catch(() => process.kill(process.pid, 'SIGUSR2'))
  })

  const knexBoundClass = MyObjection.bindKnex(knexInstance)
  knexBoundClass.tag = 'Oracle Connection'
  return knexBoundClass
}

Мой код выбора потока:

module.exports = function oracleStream(log, MyObjection) {

  const knex = MyObjection.knex()
  const fetchArraySize = 10000
  const outFormat = oracledb.OBJECT

  return {
    selectStream
  }

  async function selectStream(sql, bindings = [], fetchSize = fetchArraySize) {
    let connection = await knex.client.acquireConnection()

    log.info(`Fetch size is set to ${fetchSize}`)
    let select = connection.queryStream(sql, bindings, {
      fetchArraySize: fetchSize,
      outFormat: outFormat
    })

    select.on('error', (err) => {
      log.error('Oracle Error Event', err)
      knex.client.releaseConnection(connection)
    })

    select.on('end', () => {
      log.info('Destroying the Stream')
      select.destroy()
    })

    select.on('close', () => {
      log.info('Oracle Close Event')
      knex.client.releaseConnection(connection)
      select = null
      connection = null
    })

    return select
  }

}

Мой код конвейера индекса / потока

async function indexJob() {
    const reindexStartTime = new moment().local()

    let rowCount = 0
    log.info('Reindex Started at', reindexStartTime.format())
    let queryNumber = 1
    const partitionedQueries = ['Select * from table where 1=1', 'Select * from table where 2=2', 'Select * from table where 3=3'] //There  would be  20 queries in this array
    let partitionedQueriesLength = partitionedQueries.length

    while (partitionedQueries.length > 0) {
      let query = partitionedQueries.pop()

      log.info('RUNNING Query', {
        queryNumber: `${queryNumber++} of ${partitionedQueriesLength}`,
        query: query
      })

      let databaseStream = await oracleStream.selectStream(query, [], 10000) //10k represents the oracle fetch size

      databaseStream.on('data', () => {
        rowCount++
      })

      let logEveryFiveSec = setInterval(() => {
        log.info('Status: ', getReindexInfo(reindexStartTime, rowCount))
      }, 5000)

      try {
        let pipeline = util.promisify(stream.pipeline)
        await pipeline(
          databaseStream,
          camelizeAndStringify(),
          streamReindex(core)
        )
      } catch (err) {
        databaseStream.destroy(err)
        throw new JobFailedError(err)
      } finally {
        databaseStream.destroy()
        clearInterval(logEveryFiveSec)
      }
    }
  }

  function camelizeAndStringify() {
    let first = true
    const serialize = new Transform({
      objectMode: true,
      highWaterMark: 1000,
      transform(chunk, encoding, callback) {
        if (first) {
          this.push('[' + JSON.stringify(camelize(chunk)))
          first = false
        } else {
          this.push(',' + JSON.stringify(camelize(chunk)))
        }
        callback()
        chunk = null
      },
      flush(callback) {
        this.push(']')
        callback()
      }
    })
    return serialize
  }


function streamReindex(core) {
    const updateUrl = baseUrl + core + '/update'
    const options = {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      'auth': `${user.username}:${user.password}`,
    }
    let postStream = https.request(updateUrl, options, (res) => {
      let response = {
        status: {
          code: res.statusCode,
          message: res.statusMessage
        },
        headers: res.headers,
      }
      if (res.statusCode !== 200) {
        postStream.destroy(new Error(JSON.stringify(response)))
      }
    })
    postStream.on('error', (err)=>{
      throw new Error(err)
    })
    postStream.on('socket', (socket) => {
      socket.setKeepAlive(true, 110000)
    })
    return postStream
  }

РЕДАКТИРОВАТЬ 1: Я попытался удалить knex из уравнения, выполнив единственное соединение с моей базой данных с библиотекой oracle. К сожалению, я все еще вижу такое же поведение. Вот как я изменил свой выбор на , а не использовать knex

  async function selectStream(sql, bindings = [], fetchSize = fetchArraySize) {

    const connectionInfo = {
      user: info.user,
      password: info.password,
      connectString: info.host +'/'+info.database
    }

    const connection = await oracledb.getConnection(connectionInfo)

    log.info('Connection was successful!')

    log.info(`Fetch size is set to ${fetchSize}`)
    let select = connection.queryStream(sql, bindings, {
      fetchArraySize: fetchSize,
      outFormat: outFormat
    })

    select.on('error', async (err) => {
      log.error('Oracle Error Event', err)
      await connection.close()
    })

    select.on('end', () => {
      log.info('Destroying the Stream')
      select.destroy()
    })

    select.on('close', async () => {
      log.info('Oracle Close Event')
      await connection.close()
    })

    return select
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...