Я разрабатываю импортер файлов, я решаю использовать потоковый подход, у меня есть ряд шагов, которые я должен выполнить, чтобы достичь своей цели, которые можно увидеть ниже:
- Скачать файл.
- Разобрать в CSV
- Проверить весь файл.
- «Перевести» и сохранить в db
Вот фрагмент, pipe, затем все:
protected async pipe(readable: Readable, transform: Transform, validator: Transform, importer: T) {
const asyncPipeline = promisify(pipeline);
try {
await asyncPipeline(readable, transform, validator, importer)
logger.info("Import Finished")
} catch (error) {
const { message, stack } = error
logger.error(message, { stack })
}
}
Вот фрагмент, который вызывает указанный выше метод pipe
super.pipe(
response // response from http.get,
csv() // csv-parser library,
new Validator(),
new Importer()
)
Класс Validator:
export class Validator extends Transform {
constructor() {
super({ objectMode: true })
}
_transform(chunk: any, encoding: string, done: any) {
this.push(chunk)
logger.info("Validating", { chunk })
done()
}
_flush(done: any) {
done()
}
}
и, наконец, класс импортера :
export class Importer extends Writable {
private buffer: Car[]
constructor() {
super({ objectMode: true })
this.buffer = new Array()
}
_write(row: object, enc: string, next: any) {
this.import(row)
.then(() => next())
.catch((error: Error) => next(error))
}
private async import(data: any): Promise<Car[]> {
this.buffer.push(data)
logger.info(this.buffer.length.toString());
return await db.save(data) // fake method;
}
}
Когда вызывается super.pipe, вывод чередуется с "Validation" и общим размером буферного массива.
{"level":30,"time":1588773096585,"pid":40537,"msg":"1"}
{"level":30,"time":1588773096586,"pid":40537,"msg":"Validating"}
{"level":30,"time":1588783063275,"pid":61571,"msg":"Importation finished"}
{"level":30,"time":1588773096633,"pid":40537,"msg":"2"}
Первый способ есть? Я выполняю поток валидатора, а после go в поток импортера?