Асинхронная ограниченная очередь в JS / TS с использованием async / await - PullRequest
0 голосов
/ 17 мая 2018

Я пытаюсь обернуть голову вокруг async/await, и у меня есть следующий код:

class AsyncQueue<T> {
    queue = Array<T>()
    maxSize = 1

    async enqueue(x: T) {
        if (this.queue.length > this.maxSize) {
            // Block until available
        }

        this.queue.unshift(x)
    }

    async dequeue() {
        if (this.queue.length == 0) {
            // Block until available
        }

        return this.queue.pop()!
    }
}

async function produce<T>(q: AsyncQueue, x: T) {
    await q.enqueue(x)
}

async function consume<T>(q: AsyncQueue): T {
    return await q.dequeue()
}

// Expecting 3 4 in the console
(async () => {
    const q = new AsyncQueue<number>()
    consume(q).then(console.log)
    consume(q).then(console.log)
    produce(q, 3)
    produce(q, 4)
    consume(q).then(console.log)
    consume(q).then(console.log)
})()

Моя проблема, конечно, заключается в частях кода "Блокировать до доступности",Я ожидал, что смогу «остановить» выполнение до тех пор, пока что-то не произойдет (например, удаление очереди не будет выполнено до появления очереди, и наоборот, учитывая доступное пространство).У меня такое чувство, что мне, возможно, понадобится использовать сопрограммы для этого, но я действительно хотел убедиться, что я просто не пропускаю никакую магию async/await.

1 Ответ

0 голосов
/ 17 мая 2018

17/04/2019 Обновление: Короче говоря, в реализации AsyncSemaphore ниже есть ошибка, которая была обнаружена с помощью тестирования на основе свойств . Вы можете прочитать все об этой "сказке" здесь . Вот исправленная версия:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()!()
    }

    async wait() {
        this.permits -= 1
        if (this.permits < 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
    }
}

Наконец, после значительных усилий и воодушевленных ответом @Titian, я думаю, что решил это. Код заполнен отладочными сообщениями, но он может служить педагогическим целям относительно потока управления:

class AsyncQueue<T> {
    waitingEnqueue = new Array<() => void>()
    waitingDequeue = new Array<() => void>()
    enqueuePointer = 0
    dequeuePointer = 0
    queue = Array<T>()
    maxSize = 1
    trace = 0

    async enqueue(x: T) {
        this.trace += 1
        const localTrace = this.trace

        if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
            console.debug(`[${localTrace}] Producer Waiting`)
            this.dequeuePointer += 1
            await new Promise(r => this.waitingDequeue.unshift(r))
            this.waitingDequeue.pop()
            console.debug(`[${localTrace}] Producer Ready`)
        }

        this.queue.unshift(x)
        console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)

        if (this.enqueuePointer > 0) {
            console.debug(`[${localTrace}] Notify Consumer`)
            this.waitingEnqueue[this.enqueuePointer-1]()
            this.enqueuePointer -= 1
        }
    }

    async dequeue() {
        this.trace += 1
        const localTrace = this.trace

        console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)

        if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
            console.debug(`[${localTrace}] Consumer Waiting`)
            this.enqueuePointer += 1
            await new Promise(r => this.waitingEnqueue.unshift(r))
            this.waitingEnqueue.pop()
            console.debug(`[${localTrace}] Consumer Ready`)
        }

        const x = this.queue.pop()!
        console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)

        if (this.dequeuePointer > 0) {
            console.debug(`[${localTrace}] Notify Producer`)
            this.waitingDequeue[this.dequeuePointer - 1]()
            this.dequeuePointer -= 1
        }

        return x
    }
}

Обновление: Вот чистая версия с использованием AsyncSemaphore, которая действительно инкапсулирует способ, которым все обычно делается с использованием примитивов параллелизма, но адаптирована к асинхронному CPS-single-threadaded-event-loop ™ стиль JavaScript с async/await. Вы можете видеть, что логика AsyncQueue становится намного более интуитивной, и двойная синхронизация через Promises делегируется двум семафорам :

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()()
    }

    async wait() {
        if (this.permits == 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
        this.permits -= 1
    }
}

class AsyncQueue<T> {
    private queue = Array<T>()
    private waitingEnqueue: AsyncSemaphore
    private waitingDequeue: AsyncSemaphore

    constructor(readonly maxSize: number) {
        this.waitingEnqueue = new AsyncSemaphore(0)
        this.waitingDequeue = new AsyncSemaphore(maxSize)
    }

    async enqueue(x: T) {
        await this.waitingDequeue.wait()
        this.queue.unshift(x)
        this.waitingEnqueue.signal()
    }

    async dequeue() {
        await this.waitingEnqueue.wait()
        this.waitingDequeue.signal()
        return this.queue.pop()!
    }
}

Обновление 2: Казалось, что в приведенном выше коде скрыта тонкая ошибка, которая стала очевидной при попытке использовать AsyncQueue размера 0. Семантика имеет смысл: это очередь без какого-либо буфера, где издатель всегда ожидает существования потребителя. Линии, которые мешали ему работать, были:

await this.waitingEnqueue.wait()
this.waitingDequeue.signal()

Если вы внимательно посмотрите, вы увидите, что dequeue() не совсем симметричен enqueue(). На самом деле, если поменять местами порядок этих двух инструкций:

this.waitingDequeue.signal()
await this.waitingEnqueue.wait()

Тогда все снова работает; мне кажется интуитивно понятным, что мы сигнализируем, что есть что-то интересующее dequeuing(), прежде чем на самом деле ждать, пока enqueuing произойдет.

Я все еще не уверен, что это не приведет к появлению незначительных ошибок без тщательного тестирования. Я оставлю это как вызов;)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...