Node.js труба ручья - PullRequest
       7

Node.js труба ручья

1 голос
/ 06 мая 2020

Я разрабатываю импортер файлов, я решаю использовать потоковый подход, у меня есть ряд шагов, которые я должен выполнить, чтобы достичь своей цели, которые можно увидеть ниже:

  1. Скачать файл.
  2. Разобрать в CSV
  3. Проверить весь файл.
  4. «Перевести» и сохранить в db

Вот фрагмент, pipe, затем все:

protected async pipe(readable: Readable, transform: Transform, validator: Transform, importer: T) {
    const asyncPipeline = promisify(pipeline);
    try {
        await asyncPipeline(readable, transform, validator, importer)
        logger.info("Import Finished")
    } catch (error) {
        const { message, stack } = error
        logger.error(message, { stack })
    }
}

Вот фрагмент, который вызывает указанный выше метод pipe

super.pipe(
    response // response from http.get, 
    csv()    // csv-parser library, 
    new Validator(), 
    new Importer()
)

Класс Validator:

export class Validator extends Transform {
    constructor() {
        super({ objectMode: true })
    }

    _transform(chunk: any, encoding: string, done: any) {
        this.push(chunk)
        logger.info("Validating", { chunk })
        done()
    }

    _flush(done: any) {
        done()
    }
}

и, наконец, класс импортера :

export class Importer extends Writable {
    private buffer: Car[]

    constructor() {
        super({ objectMode: true })
        this.buffer = new Array()
    }

    _write(row: object, enc: string, next: any) {
        this.import(row)
            .then(() => next())
            .catch((error: Error) => next(error))
    }

    private async import(data: any): Promise<Car[]> {
        this.buffer.push(data)
        logger.info(this.buffer.length.toString());
        return await db.save(data) // fake method;
    }
}

Когда вызывается super.pipe, вывод чередуется с "Validation" и общим размером буферного массива.

{"level":30,"time":1588773096585,"pid":40537,"msg":"1"}
{"level":30,"time":1588773096586,"pid":40537,"msg":"Validating"}
{"level":30,"time":1588783063275,"pid":61571,"msg":"Importation finished"}
{"level":30,"time":1588773096633,"pid":40537,"msg":"2"}

Первый способ есть? Я выполняю поток валидатора, а после go в поток импортера?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...