createWriteStream в узле, кажется, выполняет событие 'end' перед 'data' - PullRequest
0 голосов
/ 07 апреля 2020

Я не могу понять, как событие l oop обрабатывает мой фрагмент. Я пытаюсь достичь

  • для чтения из csv
  • загрузки ресурса, найденного в csv
  • , загрузки его в s3
  • запишите его в новый CSV-файл
const readAndUpload =  () => {
    fs.createReadStream('filename.csv')
    .pipe(csv())
    .on('data', ((row: any) => {
        const file = fs.createWriteStream("file.jpg");
        var url = new URL(row.imageURL)
        // choose whether to make an http or https request
        let client = (url.protocol=="https:") ? https : http
        const request = client.get(row.imageURL, function(response:any) {
            // file save
            response.pipe(file);
            console.log('file saved')
            let filePath = "file.jpg";
            let params = {
                Bucket: 'bucket-name',
                Body : fs.createReadStream(filePath),
                Key : "filename.jpg"
            };
            // upload to s3
            s3.upload(params, function (err: any, data: any) {
                //handle error
                if (err) {
                  console.log("Error", err);
                }

                //success
                if (data) {
                  console.log("Uploaded in:", data.Location);
                  row.imageURL = data.Location
                  writeData.push(row)
                //   console.log(writeData)
                }
              });
        });
    }))
    .on('end', () => {
        console.log("done reading")
        const csvWriter = createCsvWriter({
            path: 'out.csv',
            header: [
                {id: 'id', title: 'some title'}
            ]
        });
        csvWriter
            .writeRecords(writeData)
            .then(() => console.log("The CSV file was written successfully"))
    })
}

Исходя из добавленных мною операторов журнала, done reading и The CSV file was written successfully печатаются до file saved. Насколько я понимаю, событие end вызывается после события data, поэтому я не уверен в том, где я ошибаюсь.

Спасибо, что прочитали!

1 Ответ

0 голосов
/ 07 апреля 2020

Я не уверен, является ли это частью проблемы или нет, но у вас есть дополнительный набор символов в этой части кода. Измените это:

.on('data', ((row: any) => {
     .....
})).on('end', () => {

на следующее:

.on('data', (row: any) => {
     .....
}).on('end', () => {

И, если обработчики событий настроены правильно, ваш обработчик событий .on('data', ...) будет вызываться перед .on('end', ....) для тот же поток. Если вы введете это:

console.log('at start of data event handler');

в качестве первой строки в этом обработчике событий, вы увидите, что он вызывается первым.

Но ваш обработчик событий данных использует несколько асинхронных вызовов и ничего Наличие в вашем коде заставляет событие end ждать завершения всей обработки в обработчике data. Итак, поскольку эта обработка занимает некоторое время, естественно, что событие end произойдет до того, как вы завершите выполнение всего этого асинхронного кода для события data.


Кроме того, если вы когда-либо может иметь более одного data события (которое обычно происходит), вы будете иметь несколько data событий в полете одновременно, и, поскольку вы используете фиксированное имя файла, они, вероятно, будут перезаписывать друг друга .


Обычный способ решить что-то вроде этого - stream.pause() приостановить поток чтения в начале обработки события data, а затем, когда все ваши асинхронные операции выполнены, вы можете затем stream.resume(), чтобы снова начать движение.

Вам нужно будет получить правильный поток, чтобы приостановить и возобновить. Вы можете сделать что-то вроде этого:

let stream = fs.createReadStream('filename.csv')
 .pipe(csv());

stream.on('data', ((row: any) => {
   stream.pause();
   ....
});

Затем, внутри вашего s3.upload() обратного вызова, вы можете позвонить stream.resume(). Вам также понадобится намного, намного лучшая обработка ошибок, чем у вас, иначе вещи просто застрянут, если вы получите ошибку.

Похоже, у вас также есть другие проблемы параллелизма, когда вы звоните:

response.pipe(file);

И затем вы пытаетесь использовать file, фактически не дожидаясь выполнения операции .pipe() (которая также является асинхронной). В целом, вся эта логика c действительно нуждается в серьезной очистке. Я не понимаю, что именно вы пытаетесь сделать на разных этапах, чтобы узнать, как написать абсолютно чистую и более простую версию.

...