Уничтожение потока pg-prom не работает после Node.js обновления - PullRequest
0 голосов
/ 05 августа 2020

Я использую поток pg-prom (который использует pg-query-stream) для запроса большой таблицы, и когда у меня есть желаемое количество строк, я использую destroy (метод pg-query-stream), чтобы закрыть / завершить поток .

Это работало на Node.js v10, но после обновления до v14 запрос не завершился. Некоторые наблюдения:

  • Я использую событие 'end', чтобы завершить буферизованные данные. После обновления это никогда не вызывается.
  • Если я not 'уничтожаю' поток, он работает.
  • Так что, похоже, pg-prom как-то не справляется с уничтожением stream / cursor.
  • Но проблема возникла после обновления Node.js (не обновления pg-обещания).

Кто-нибудь знает, как обновление Node.js может повлиять это и как его решить?

Я нашел один способ решить эту проблему, но чувствую себя взломанным:

  • Я подключаю функцию, которая преобразует входящие строки.
  • В этой функции я вызвал метод 'destroy'.
  • Если я дополнительно испущу событие 'end', тогда оно сработает.
  • Он запускает конечное событие и освобождает ожидаемое обещание.
  • Но если pg-обещание освобождает соединение и курсор, я не уверен.

Вот код:

  streamQuery: async function (query, functions) {
    try {
      var _buffer = [];
      var data = [];
      const qs = new QueryStream(query);
      var result = await db.stream(qs, s => {
        // Swap buffer
        const swap = function () {
          for (var i = 0; i < functions.length; i++) {
            _buffer = df[functions[i].name](_buffer, functions[i].params)
          }
          data.push(..._buffer);
          _buffer = []
        }
        // Parse incoming rows
        const { Transform } = require('stream');
        const parse = new Transform({
          objectMode: true,
          transform(chunk, encoding, callback) {
            _buffer.push(chunk)
            if (_buffer.length >= config.wrangling.bufferSize) {
              swap();
              // If we've enought rows destroy/close the stream.
              if (data.length >= config.wrangling.limit) {
                //s.destroy(); // works in Node.js V10, not in V14.
                s.destroy().emit( "end" ); // works in V14
              }
            } 
            callback();
          }
        });
        s.on('end', () => {
          if (_buffer.length > 0) swap();
        })
        s.pipe(parse)
      })
      console.log(
        'Total rows processed:', result.processed,
        'Rows after parsing:', data.length,
        'Duration in milliseconds:', result.duration);
      return data;
    } catch (err) {
      utils.log.error('dataService db-promise-stream-query', err);
    }
  }
...