NodeJs Потоки, конвейеры и сообщения https - PullRequest
0 голосов
/ 18 марта 2020

Мне нужна небольшая проверка работоспособности.

Я использую Node 11.10.1

У меня есть процесс, который читает из базы данных oracle с использованием библиотеки nodejs oracledb , Существует функция потоковой передачи, в которой я выбираю * и передаю результаты в виде пакетов по 10 тыс. Объектов. Затем я отправляю эти данные в индексатор через https. Поток объекта внедряется в функцию конвейера.

Некоторое время я использовал следующий код. Я пытаюсь отладить пропускную способность. Иногда я вижу около 2 тыс. Документов в секунду, обрабатываемых по этому конвейеру. В большинстве случаев я вижу <150. Прежде чем перейти к отладке моего сервера индексов. Я хочу убедиться, что эти функции закодированы правильно. </p>

  async function streamReindex(databaseStream) {
    let pipeline = util.promisify(stream.pipeline)
    await pipeline(
      selectStream,//  "oracledb": "^4.0.0", stream function
      camelize.camelizeStream(), //"camelize2": "^1.0.0", library wrapped in ,"through2": "^3.0.1" library to make it an object stream
      JSONStream.stringify(), //"JSONStream": "^1.3.5"
      reindexClient.streamReindex(core)
    )
  }

// reindexClient code.

  function streamReindex(core) {
    const updateUrl = baseUrl + core + '/update'
    const options = {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      'auth': `${user.username}:${user.password}`,
    }
    let postStream = https.request(updateUrl, options, (res) => {
      let response = {
        status: {
          code: res.statusCode,
          message: res.statusMessage
        },
        headers: res.headers,
      }
      if (res.statusCode !== 200) {
        postStream.destroy(new Error(JSON.stringify(response)))
      }
    })
    postStream.on('error', (err)=>{
      throw new Error(err)
    })
    postStream.on('socket', (socket) => {
      socket.setKeepAlive(true, 240000)
    })
    return postStream
  }


  async function selectStream(sql, bindings = [], fetchSize = 
     fetchArraySize) {
     let connection = await knex.client.acquireConnection()

    log.info(`Fetch size is set to ${fetchSize}`)
    let select = connection.queryStream(sql, bindings, {
      fetchArraySize: fetchSize,
      outFormat: outFormat
    })

    select.on('error', (err) => {
      log.error('Oracle Error Event', err)
      knex.client.releaseConnection(connection)
    })

    select.on('close', () => {
      log.info('Oracle Close Event')
      knex.client.releaseConnection(connection)
      select = null
      connection = null
    })

    return select
  }

Если я удаляю функцию reindexClient.streamReindex (core) из конвейера. Я вижу пропускную способность ~ 5 тыс. Объектов в секунду. Я изучал функциональность highwatermark потоков, но, похоже, не могу понять, как применить его к postStream. Если я отправляю сообщение из console.log, он также не говорит, что находится в объектном режиме. Это означает, что его верхняя метка в байтах имеет низкий порог, я считаю.

Если вам понадобится дополнительная информация, я постараюсь предоставить как можно больше.

Ответы [ 2 ]

1 голос
/ 20 марта 2020

Хотя ваша проблема, похоже, не с oracledb, я помещаю это здесь, чтобы я мог отформатировать код. Вы можете получить некоторое преимущество в производительности от настройки потока oracledb, например:

   diff --git a/lib/queryStream.js b/lib/queryStream.js
   index 08ddc720..11953e4b 100644
   --- a/lib/queryStream.js
   +++ b/lib/queryStream.js
   @@ -24,7 +24,7 @@ const { Readable } = require('stream');
    class QueryStream extends Readable {

      constructor(rs) {
   -    super({ objectMode: true });
   +    super({ objectMode: true, highWaterMark: 64 });  // choose your own value
    this._fetching = false;
    this._numRows = 0;

. Будет приветствоваться PR, позволяющий установить верхний водяной знак как параметр queryStream().

0 голосов
/ 17 апреля 2020

Так что я увеличил свою пропускную способность почти на 500 / документов в секунду, реализовав функцию JSONStream.stringify () самостоятельно. Так как это не позволило мне установить водяной знак. Как только я это сделал, я смог действительно увеличить его, но я мог использовать всю свою память. Установка с помощью следующего кода дала мне стабильный объем памяти и большую пропускную способность. Я также избавился от библиотеки through2 и поместил верблюжий функционал в свой stringif Transfrom. Большую часть кода и объяснения можно найти здесь: https://blog.dmcquay.com/2017/09/06/node-stream-db-results-with-transform.html

Вот код:

  function camelizeAndStringify() {
    let first = true
    const serialize = new Transform({
      objectMode: true,
      highWaterMark: 1000,
      transform(chunk, encoding, callback) {
        if (first) {
          this.push('[' + JSON.stringify(camelize(chunk)))
          first = false
        } else {
          this.push(',' + JSON.stringify(camelize(chunk)))
        }
        callback()
        chunk = null
      },
      flush(callback) {
        this.push(']')
        callback()
      }
    })
    return serialize
  }
...