Я использую поток pg-prom (который использует pg-query-stream) для запроса большой таблицы, и когда у меня есть желаемое количество строк, я использую destroy (метод pg-query-stream), чтобы закрыть / завершить поток .
Это работало на Node.js v10, но после обновления до v14 запрос не завершился. Некоторые наблюдения:
- Я использую событие 'end', чтобы завершить буферизованные данные. После обновления это никогда не вызывается.
- Если я not 'уничтожаю' поток, он работает.
- Так что, похоже, pg-prom как-то не справляется с уничтожением stream / cursor.
- Но проблема возникла после обновления Node.js (не обновления pg-обещания).
Кто-нибудь знает, как обновление Node.js может повлиять это и как его решить?
Я нашел один способ решить эту проблему, но чувствую себя взломанным:
- Я подключаю функцию, которая преобразует входящие строки.
- В этой функции я вызвал метод 'destroy'.
- Если я дополнительно испущу событие 'end', тогда оно сработает.
- Он запускает конечное событие и освобождает ожидаемое обещание.
- Но если pg-обещание освобождает соединение и курсор, я не уверен.
Вот код:
streamQuery: async function (query, functions) {
try {
var _buffer = [];
var data = [];
const qs = new QueryStream(query);
var result = await db.stream(qs, s => {
// Swap buffer
const swap = function () {
for (var i = 0; i < functions.length; i++) {
_buffer = df[functions[i].name](_buffer, functions[i].params)
}
data.push(..._buffer);
_buffer = []
}
// Parse incoming rows
const { Transform } = require('stream');
const parse = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
_buffer.push(chunk)
if (_buffer.length >= config.wrangling.bufferSize) {
swap();
// If we've enought rows destroy/close the stream.
if (data.length >= config.wrangling.limit) {
//s.destroy(); // works in Node.js V10, not in V14.
s.destroy().emit( "end" ); // works in V14
}
}
callback();
}
});
s.on('end', () => {
if (_buffer.length > 0) swap();
})
s.pipe(parse)
})
console.log(
'Total rows processed:', result.processed,
'Rows after parsing:', data.length,
'Duration in milliseconds:', result.duration);
return data;
} catch (err) {
utils.log.error('dataService db-promise-stream-query', err);
}
}