Как сделать функцию с событиями и конвейером в nodejs - PullRequest
0 голосов
/ 17 февраля 2020

Я пишу библиотеку nodejs на машинописном компьютере, основной областью этой библиотеки будет загрузка материалов с заданного URL-адреса. Я бы хотел, чтобы это использовалось следующим образом:

import library from 'library'

library('https://www.example.com')
    .on('progress', (progress: progress) => {
        //do something with the progress
    })
    .on('end', () => {
        //do something when done
    })
    .pipe(fs.createWriteStream('./test/file.mp4'))

Я никогда не работал с событиями узла и потоком таким образом, поэтому я понятия не имею, как даже go об этом, я использую машинописный текст и веб-пакет, также, пожалуйста, простите за плохой английский sh

1 Ответ

0 голосов
/ 18 февраля 2020

Вам понадобится реализовать Читаемый поток . Потоки узлов являются экземплярами EventEmitter , поэтому вы автоматически получаете доступ к API событий.

Как минимум, вам необходимо реализовать метод _read(), который вызывается внутри системы всякий раз, когда потребитель готов получить больше данных из очереди. Поскольку вы хотите, чтобы библиотека сообщала о прогрессе, вам также необходимо отслеживать, сколько данных было обработано, и соответственно генерировать события.

Приведенный ниже код игнорирует несколько важных вещей, таких как обратное давление , но это начало. Я использую node-fetch в качестве библиотеки запросов, поскольку она предоставляет базовый поток ответов и довольно проста в использовании.

// fileLoader.js
const {Readable} = require('stream')
const fetch = require('node-fetch')

class FileLoader extends Readable {
  constructor(url) {
    super()
    this._url = url
    this._fetchStarted = false
    this._totalLength = 0
    this._currentLength = 0
  }

  _processData(stream) {
    stream
      .on('end', () => {
        this.push(null)
      })
      .on('error', (err) => {
        this.destroy(err)
      })
      .on('data', (chunk) => {
        this._currentLength += chunk.length
        if (this._totalLength) {
          this.emit('progress', Math.round(this._currentLength / this._totalLength * 100))
        }
        this.push(chunk)
      })
  }

  _startFetch() {
    fetch(this._url)
      .then((res) => {
        if (!res.ok) {
          return this.destroy(new Error(`fetch resulted in ${res.status}`))
        }
        this._totalLength = res.headers.get('content-length')
        this._processData(res.body)
      })
      .catch((err) => {
        return this.destroy(new Error(err))
      })
  }

  _read() {
    if (!this._fetchStarted) {
      this._fetchStarted = true
      this._startFetch()
    }
  }
}

module.exports.loadFile = (url) => new FileLoader(url)

И код потребителя:

// consumer.js
const fs = require('fs')
const {loadFile} = require('./fileLoader')

loadFile('http://example.com/video.mp4')
  .on('progress', (progress) => {
    console.log(`${progress}%`)
  })
  .on('end', () => {
    console.log('done')
  })
  .on('error', (err) => {
    console.log(err)
  })
  .pipe(fs.createWriteStream('./tempy.mp4'))
...