Почему мой приостановленный читаемый поток не остается приостановленным? - PullRequest
0 голосов
/ 09 июля 2020

Моя цель: в Node.js передать поток large-i sh JSON (144 МБ) через выборку, разобрать его на элементы, а затем сохранить каждый элемент на моем сервере Parse Platform. Я хочу передавать только фрагмент после того, как предыдущий фрагмент был проанализирован и успешно сохранен. Я обнаружил, что приведенный ниже код не приостанавливает поток до тех пор, пока не появится предыдущий «await newObject.save ();» линия завершена.

Есть база данных, которую мне нужно обновлять каждые 56 дней. Я ограничен 30 запросами в секунду облачным сервером. Некоторые из отдельных объектов являются крупными sh, то есть у меня может быть более 30 одновременно в процессе сохранения, поэтому некоторые из них будут отклонены.

  function schmoo(url){
  let count = 0;
  let downloadStart = Date.now();
  console.log("download starting at: ", downloadStart);
  fetch(url)
    .then(response =>  {
      const reader = response.body;
      reader.pipe(JSONStream.parse('features.*'))
          .on('data', async function (feature) {
            reader.pause();
            try{
                  count++;
                  await sleep(numberOfMilliseconds);
                  if(shouldAdd(feature)){
                          if(feature.geometry.type == "Polygon"){
                              let newObject = processFeature(feature, feature.geometry.coordinates[0]);
                              await newObject.save();
                          }else if (feature.geometry.type == "MultiPolygon"){
                              let i = 0;
                              for(i = 0; i < feature.geometry.coordinates.length; i++){
                                  let el = feature.geometry.coordinates[i];
                                  let newObject = processFeature(feature, el[0]);
                                  await newObject.save();
                              }
                              console.log("found multipolygon");
                            }else {
                              console.log("not a polygon or multipolygon")
                              }
                    }
              } catch(error){
                  console.log(error);
                }
              reader.resume();
              })

          .on('error', function (err) {
            throw err;
          })
          .on('end', function (){
            console.log("parsed through", count, "items finished after", Date.now()-start);
          });
    });
  }

1 Ответ

0 голосов
/ 14 июля 2020

Использование потока NodeJS хорошо подходит для описания проблемы. Противодавление потоковой библиотеки заставит каждую часть работы обрабатываться последовательно.

Пример, предоставленный OP, объединяет с использованием события data в потоковом режиме с обработчиком событий asyn c. Это указывает NodeJS на выполнение зарегистрированного обработчика событий каждый раз, когда в конвейере появляется новый фрагмент данных / JSON. Однако NodeJS не будет автоматически ожидать функцию asyn c. Таким образом, обработчик событий asyn c выполняется последовательно для каждого фрагмента, но не будет ждать завершения обработки предыдущих фрагментов, прежде чем перейти к следующему фрагменту.

NodeJS (stream.Writable при объединении асинхронной c обработки фрагментов потока с обратным давлением. Класс Writable имеет функцию callback, которая указывает, когда каждый фрагмент завершил обработку:

import stream from "stream";
import util from "util";

let downloadStart = Date.now();

fetch(url)
  .then(response =>  {
const reader = response.body;

return util
  .promisify(stream.pipeline)(
    reader, 
    new stream.Writable({
      write: (feature, encoding, callback) => {
        try {
          await sleep(numberOfMilliseconds);
          if(shouldAdd(feature)){
            // do work
          }
          callback();
        }
        catch(error){
          callback(error);
        }
      }
    })
  )
  .then(()=>{
    console.log("parsed through", count, "items finished after", Date.now()-start);
  })
  });

Если вы работаете с NodeJS v13 +, вы можете использовать асинхронный c итератор вместо Writable, что, как правило, делает этот вариант использования более кратким:

import stream from "stream";
import util from "util";

let downloadStart = Date.now();

fetch(url)
  .then(response =>  {
    const reader = response.body;

    return util
      .promisify(stream.pipeline)(reader, async (source) => {
        for await (const feature of source){
          await sleep(numberOfMilliseconds);
          if(shouldAdd(feature)){
            // do work
          }
        }
      })
      .then(()=>{
        console.log("parsed through", count, "items finished after", Date.now()-start);
      })
  });
...