17/04/2019 Обновление: Короче говоря, в реализации AsyncSemaphore ниже есть ошибка, которая была обнаружена с помощью тестирования на основе свойств . Вы можете прочитать все об этой "сказке" здесь . Вот исправленная версия:
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()!()
}
async wait() {
this.permits -= 1
if (this.permits < 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
}
}
Наконец, после значительных усилий и воодушевленных ответом @Titian, я думаю, что решил это. Код заполнен отладочными сообщениями, но он может служить педагогическим целям относительно потока управления:
class AsyncQueue<T> {
waitingEnqueue = new Array<() => void>()
waitingDequeue = new Array<() => void>()
enqueuePointer = 0
dequeuePointer = 0
queue = Array<T>()
maxSize = 1
trace = 0
async enqueue(x: T) {
this.trace += 1
const localTrace = this.trace
if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
console.debug(`[${localTrace}] Producer Waiting`)
this.dequeuePointer += 1
await new Promise(r => this.waitingDequeue.unshift(r))
this.waitingDequeue.pop()
console.debug(`[${localTrace}] Producer Ready`)
}
this.queue.unshift(x)
console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)
if (this.enqueuePointer > 0) {
console.debug(`[${localTrace}] Notify Consumer`)
this.waitingEnqueue[this.enqueuePointer-1]()
this.enqueuePointer -= 1
}
}
async dequeue() {
this.trace += 1
const localTrace = this.trace
console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)
if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
console.debug(`[${localTrace}] Consumer Waiting`)
this.enqueuePointer += 1
await new Promise(r => this.waitingEnqueue.unshift(r))
this.waitingEnqueue.pop()
console.debug(`[${localTrace}] Consumer Ready`)
}
const x = this.queue.pop()!
console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)
if (this.dequeuePointer > 0) {
console.debug(`[${localTrace}] Notify Producer`)
this.waitingDequeue[this.dequeuePointer - 1]()
this.dequeuePointer -= 1
}
return x
}
}
Обновление: Вот чистая версия с использованием AsyncSemaphore
, которая действительно инкапсулирует способ, которым все обычно делается с использованием примитивов параллелизма, но адаптирована к асинхронному CPS-single-threadaded-event-loop ™ стиль JavaScript с async/await
. Вы можете видеть, что логика AsyncQueue
становится намного более интуитивной, и двойная синхронизация через Promises делегируется двум семафорам :
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()()
}
async wait() {
if (this.permits == 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
this.permits -= 1
}
}
class AsyncQueue<T> {
private queue = Array<T>()
private waitingEnqueue: AsyncSemaphore
private waitingDequeue: AsyncSemaphore
constructor(readonly maxSize: number) {
this.waitingEnqueue = new AsyncSemaphore(0)
this.waitingDequeue = new AsyncSemaphore(maxSize)
}
async enqueue(x: T) {
await this.waitingDequeue.wait()
this.queue.unshift(x)
this.waitingEnqueue.signal()
}
async dequeue() {
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
return this.queue.pop()!
}
}
Обновление 2: Казалось, что в приведенном выше коде скрыта тонкая ошибка, которая стала очевидной при попытке использовать AsyncQueue
размера 0. Семантика имеет смысл: это очередь без какого-либо буфера, где издатель всегда ожидает существования потребителя. Линии, которые мешали ему работать, были:
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
Если вы внимательно посмотрите, вы увидите, что dequeue()
не совсем симметричен enqueue()
. На самом деле, если поменять местами порядок этих двух инструкций:
this.waitingDequeue.signal()
await this.waitingEnqueue.wait()
Тогда все снова работает; мне кажется интуитивно понятным, что мы сигнализируем, что есть что-то интересующее dequeuing()
, прежде чем на самом деле ждать, пока enqueuing
произойдет.
Я все еще не уверен, что это не приведет к появлению незначительных ошибок без тщательного тестирования. Я оставлю это как вызов;)