Я хочу иметь два параллельных цикла в современном инструменте nodeJS.
Один цикл просматривает интересные вещи и помещает их в массив.
Второй цикл сдвигает вещи с массива иобрабатывает их.
Идея состоит в том, что эти два цикла не блокируют друг друга.Второй цикл сможет наверстать упущенное в процессе обработки всякий раз, когда первый цикл выполняет много сканирования, но не находит много.Цикл обработки должен фактически ждать только в том случае, если не осталось невыполненных заданий и не поступают свежие данные.
Я реализовал цикл сканирования несколькими способами.Это легкая часть.(В этом случае я рекурсивно сканирую fs в поисках файлов определенного типа.)
Для цикла обработки у меня он работает по опросу, но я чувствую, что должен быть в состоянии заставить его работать с чистымasync / await.
Но я не могу разобраться с этим.Концептуально цикл сканирования должен выполнить обещание предупредить второй цикл о том, что в массиве что-то есть, или должен быть генератор, выдающий каждое новое значение вместо использования массива.
Но я не вижу, как это сделатьобещать снова и снова, или ждать активности массива напрямую, или делать это как генератор, не вызывая блокировку между двумя циклами.
Я, должно быть, обдумываю это!Чего мне не хватает?
Код работает через опрос, с закомментированными битами, к которым может принадлежать асинхронная / ожидающая реализация:
"use strict";
const { basename, join } = require('path')
const { promisify } = require('util')
const fs = require('fs')
const readdir = promisify(fs.readdir)
const lstat = promisify(fs.lstat)
async function* scanpaths(paths) {
for (const path of paths) {
yield* scanonepath(path)
}
}
async function* scanonepath(path) {
try {
const s = await lstat(path)
if (s.isDirectory()) {
for (const entry of await readdir(path)) {
yield* scanonepath(join(path, entry))
}
} else if (/\.[mM][pP]3$/.test(path)) {
yield { pathname:path, basename:basename(path), stat:s }
}
} catch (e) {
// special file, deleted file, etc
}
}
async function* checkqueue(buf) {
if (buf.length) {
yield buf.shift()
} else {
// TODO await something to arrive in the buf - HOW?
}
}
async function processmp3(fullname, name, stat) {
try {
console.log(fullname)
// TODO tricky processing goes here
} catch (e) {
console.log(name, e)
}
}
(async () => {
let int = null
let globaldone = false
let globalprocessing = []
let buf = []
async function poll() {
if (buf.length) {
let clone = Array.from(buf)
buf.length = 0 // NOT buf = [] as that doesn't change other refs to buf
for (let e of clone) {
globalprocessing.push(processmp3(e.pathname, e.basename, e.stat))
}
}
if (globaldone) {
await Promise.all(globalprocessing)
console.warn("*** finished processing")
} else {
setTimeout(poll, 125)
}
}
// start polling for scanned files ready to process
console.log("** start polling")
setTimeout(poll, 0)
//console.log("** start scanning")
//await enqueue(buf)
globaldone = true
console.warn("*** finished scanning")
})()
// TODO how?
async function enqueue(buf) {
// start scanning by iterating over our generator which does the recursive directory stuff
for await (const file of scanpaths(process.argv.slice(2))) {
buf.push(file)
// TODO resolve a promise to notify dequeue? or also yield this file
}
}