Использование async / await для ожидания поступления данных в массив - PullRequest
0 голосов
/ 08 апреля 2019

Я хочу иметь два параллельных цикла в современном инструменте nodeJS.

Один цикл просматривает интересные вещи и помещает их в массив.

Второй цикл сдвигает вещи с массива иобрабатывает их.

Идея состоит в том, что эти два цикла не блокируют друг друга.Второй цикл сможет наверстать упущенное в процессе обработки всякий раз, когда первый цикл выполняет много сканирования, но не находит много.Цикл обработки должен фактически ждать только в том случае, если не осталось невыполненных заданий и не поступают свежие данные.

Я реализовал цикл сканирования несколькими способами.Это легкая часть.(В этом случае я рекурсивно сканирую fs в поисках файлов определенного типа.)

Для цикла обработки у меня он работает по опросу, но я чувствую, что должен быть в состоянии заставить его работать с чистымasync / await.

Но я не могу разобраться с этим.Концептуально цикл сканирования должен выполнить обещание предупредить второй цикл о том, что в массиве что-то есть, или должен быть генератор, выдающий каждое новое значение вместо использования массива.

Но я не вижу, как это сделатьобещать снова и снова, или ждать активности массива напрямую, или делать это как генератор, не вызывая блокировку между двумя циклами.

Я, должно быть, обдумываю это!Чего мне не хватает?


Код работает через опрос, с закомментированными битами, к которым может принадлежать асинхронная / ожидающая реализация:

"use strict";

const { basename, join } = require('path')

const { promisify } = require('util')
const fs = require('fs')
const readdir = promisify(fs.readdir)
const lstat = promisify(fs.lstat)

async function* scanpaths(paths) {
    for (const path of paths) {
        yield* scanonepath(path)
    }
}

async function* scanonepath(path) {
    try {
        const s = await lstat(path)

        if (s.isDirectory()) {
            for (const entry of await readdir(path)) {
                yield* scanonepath(join(path, entry))
            }
        } else if (/\.[mM][pP]3$/.test(path)) {
            yield { pathname:path, basename:basename(path), stat:s }
        }
    } catch (e) {
        // special file, deleted file, etc
    }
}

async function* checkqueue(buf) {
    if (buf.length) {
        yield buf.shift()
    } else {
        // TODO await something to arrive in the buf - HOW?
    }
}

async function processmp3(fullname, name, stat) {
    try {
        console.log(fullname)
        // TODO tricky processing goes here
    } catch (e) {
        console.log(name, e)
    }
}

(async () => {
    let int = null
    let globaldone = false
    let globalprocessing = []
    let buf = []

    async function poll() {
        if (buf.length) {
            let clone = Array.from(buf)
            buf.length = 0  // NOT buf = [] as that doesn't change other refs to buf

            for (let e of clone) {
                globalprocessing.push(processmp3(e.pathname, e.basename, e.stat))
            }
        }

        if (globaldone) {
            await Promise.all(globalprocessing)
            console.warn("*** finished processing")
        } else {
            setTimeout(poll, 125)
        }
    }

    // start polling for scanned files ready to process
    console.log("** start polling")
    setTimeout(poll, 0)

    //console.log("** start scanning")
    //await enqueue(buf)

    globaldone = true
    console.warn("*** finished scanning")
})()

// TODO how?
async function enqueue(buf) {
    // start scanning by iterating over our generator which does the recursive directory stuff
    for await (const file of scanpaths(process.argv.slice(2))) {
        buf.push(file)
        // TODO resolve a promise to notify dequeue? or also yield this file
    }
}

1 Ответ

0 голосов
/ 08 апреля 2019

Пусть цикл сканера, когда он что-то находит и толкает, вызывает функцию, сообщающую очереди обработки, что что-то было добавлено в массив.Если эта функция вызывается в то время, когда очередь обработки активно что-то делает, ничего не делайте - в противном случае, если очередь обработки простаивает, это скажет очереди обработки сместить элемент из массива сканирования и обработать его.

Конечно, когда очередь обработки завершает обработку элемента, попросите его проверить, есть ли дополнительные элементы для обработки, и, если они есть, немедленно обработать следующий.Это гарантирует, что очередь обработки будет бездействующей, только когда больше нет элементов для обработки, и она начнет работать снова, как только будет выдвинут новый элемент.

Например, с вашим кодом вы могли бывместо этого сделайте что-нибудь подобное:

let processing = false;
async function processNext() {
  if (processing) {
    return;
  }
  processing = true;
  const e = buf.shift();
  const mp3 = await processmp3(e.pathname, e.basename, e.stat);
  // do something with parsed mp3?
  processing = false;
  processNext();
}


async function enqueue(buf) {
  for await (const file of scanpaths(process.argv.slice(2))) {
    buf.push(file);
    processNext();
  }
}
...