Я не уверен, является ли это частью проблемы или нет, но у вас есть дополнительный набор символов в этой части кода. Измените это:
.on('data', ((row: any) => {
.....
})).on('end', () => {
на следующее:
.on('data', (row: any) => {
.....
}).on('end', () => {
И, если обработчики событий настроены правильно, ваш обработчик событий .on('data', ...)
будет вызываться перед .on('end', ....)
для тот же поток. Если вы введете это:
console.log('at start of data event handler');
в качестве первой строки в этом обработчике событий, вы увидите, что он вызывается первым.
Но ваш обработчик событий данных использует несколько асинхронных вызовов и ничего Наличие в вашем коде заставляет событие end
ждать завершения всей обработки в обработчике data
. Итак, поскольку эта обработка занимает некоторое время, естественно, что событие end
произойдет до того, как вы завершите выполнение всего этого асинхронного кода для события data
.
Кроме того, если вы когда-либо может иметь более одного data
события (которое обычно происходит), вы будете иметь несколько data
событий в полете одновременно, и, поскольку вы используете фиксированное имя файла, они, вероятно, будут перезаписывать друг друга .
Обычный способ решить что-то вроде этого - stream.pause()
приостановить поток чтения в начале обработки события data
, а затем, когда все ваши асинхронные операции выполнены, вы можете затем stream.resume()
, чтобы снова начать движение.
Вам нужно будет получить правильный поток, чтобы приостановить и возобновить. Вы можете сделать что-то вроде этого:
let stream = fs.createReadStream('filename.csv')
.pipe(csv());
stream.on('data', ((row: any) => {
stream.pause();
....
});
Затем, внутри вашего s3.upload()
обратного вызова, вы можете позвонить stream.resume()
. Вам также понадобится намного, намного лучшая обработка ошибок, чем у вас, иначе вещи просто застрянут, если вы получите ошибку.
Похоже, у вас также есть другие проблемы параллелизма, когда вы звоните:
response.pipe(file);
И затем вы пытаетесь использовать file
, фактически не дожидаясь выполнения операции .pipe()
(которая также является асинхронной). В целом, вся эта логика c действительно нуждается в серьезной очистке. Я не понимаю, что именно вы пытаетесь сделать на разных этапах, чтобы узнать, как написать абсолютно чистую и более простую версию.