Node.js Streams - планировать события во времени без противодавления - PullRequest
0 голосов
/ 30 сентября 2018

Используя Node.js, я пытаюсь «воспроизвести» простой CSV-файл, который содержит точку с плавающей запятой в одном столбце и числовое значение в другом.Идея состоит в том, чтобы выводить значения в моменты времени, указанные в первом столбце после запуска сценария.

Это код, который я написал для этого.

const NS_PER_SEC = 1e9;

const {chain}  = require('stream-chain');

const {parser} = require('stream-csv-as-json');
const {streamValues} = require('stream-json/streamers/StreamValues');

const fs   = require('fs');
const zlib = require('zlib');

// CSV file streaming pipeline
const pipeline = chain([
  fs.createReadStream('timed-data.csv'),
  parser({separator: ','}),
  streamValues(),
  data => {
    const value = data.value;
    return {t: value[0], pitch: value[1]}
  }
]);

var startTime = process.hrtime();

// Output a line every second for testing
setInterval(()=>{
  let t = process.hrtime(startTime)[0]
  console.log('----------------' + t + '-----------------------')
}, 1000)


pipeline.on('data', (d) => {
  // get time
  let hrtDiff = process.hrtime(startTime);
  // convert hrtime array to float sec
  let hrtDiffFloat = (hrtDiff[0] + (hrtDiff[1] / NS_PER_SEC));
  // calcualte time difference from now
  var diff = (d.t - hrtDiffFloat)*1000;
  console.log('Setting timeout to '+ diff + 'ms');
  setTimeout((d)=>console.log('data: '+d.t+', '+d.pitch), diff, d);
});

В настоящее время тысячи выполненийзапланированы как узел летит через файл.Позже события начинают срабатывать.

Вопрос: Как запланировать выполнение событий по одному (или нескольким) за раз, чтобы новые элементы извлекались из потока после запланированногоказни закончились?

(кроме военных преступлений, таких как while (Date.now () )

Обратите внимание, что я новичок в Node, так что шансыявляются ли потоки неправильным инструментом для этой задачи.

Пример данных: (реальные данные содержат сотни значений в секунду)

time,val
1.886621315,0
2.757369614,186.920
3.848707482,178.005
4.440816326,0
4.992290249,154.440
5.932698412,0
7.845260770,0
9.027936507,240.235
10.164172335,264.044
11.625487528,198.861
13.526439909,249.802
14.841088435,0
15.243628117,173.235
15.847346938,198.861
17.250612244,223.481
18.521541950,218.313
20.495238095,264.044
21.796371882,348.087
22.134240362,278.755
26.769705215,249.083

Вывод на консоль для данных примера:

Mikulass-MBP:experiments mikulas$ node strem-csv.js 
Setting timeout to NaNms
Setting timeout to 1882.043799ms
Setting timeout to 2752.6014609999997ms
Setting timeout to 3843.824312ms
Setting timeout to 4435.851658ms
Setting timeout to 4987.248032ms
Setting timeout to 5927.584522ms
Setting timeout to 7840.075599000001ms
Setting timeout to 9022.677742ms
Setting timeout to 10158.839757ms
Setting timeout to 11620.070245ms
Setting timeout to 13520.945921ms
Setting timeout to 14835.519640999999ms
Setting timeout to 15237.984088ms
Setting timeout to 15841.62652ms
Setting timeout to 17244.816256ms
Setting timeout to 18515.670276999997ms
Setting timeout to 20489.291373ms
Setting timeout to 21790.351713ms
Setting timeout to 22128.146255ms
Setting timeout to 26763.537879999996ms
data: time, val
----------------1-----------------------
data: 1.886621315, 0
----------------2-----------------------
data: 2.757369614, 186.920
----------------3-----------------------
data: 3.848707482, 178.005
----------------4-----------------------
data: 4.440816326, 0
data: 4.992290249, 154.440
----------------5-----------------------
data: 5.932698412, 0
----------------6-----------------------
----------------7-----------------------
data: 7.845260770, 0
----------------8-----------------------
data: 9.027936507, 240.235
----------------9-----------------------
----------------10-----------------------
data: 10.164172335, 264.044
----------------11-----------------------
data: 11.625487528, 198.861
----------------12-----------------------
----------------13-----------------------
data: 13.526439909, 249.802
----------------14-----------------------
data: 14.841088435, 0
----------------15-----------------------
data: 15.243628117, 173.235
data: 15.847346938, 198.861
----------------16-----------------------
----------------17-----------------------
data: 17.250612244, 223.481
----------------18-----------------------
data: 18.521541950, 218.313
----------------19-----------------------
----------------20-----------------------
data: 20.495238095, 264.044
----------------21-----------------------
data: 21.796371882, 348.087
----------------22-----------------------
data: 22.134240362, 278.755
----------------23-----------------------
----------------24-----------------------
----------------25-----------------------
----------------26-----------------------
data: 26.769705215, 249.083
----------------27-----------------------
----------------28-----------------------

1 Ответ

0 голосов
/ 30 сентября 2018

Вы можете добавить асинхронную функцию в цепочку:

const timer = ms => new Promise(res => setTimeout(res, ms));

const pipeline = chain([
 fs.createReadStream('timed-data.csv'),
 parser({separator: ','}),
 streamValues(),
 async data => {
   const [time, pitch] = data.value;
   await timer(time * 1000); // "sleep"
   return {time, pitch };
 }
]);

, которая создает противодавление, что на самом деле является наиболее элегантным вариантом, поскольку в противном случае вам пришлось бы загружать все данные в буфер.

...