Оберните функцию обратного вызова набора результатов генератором / итератором - PullRequest
0 голосов
/ 29 января 2019

Я работаю над преобразованием устаревшего API на основе обратных вызовов в асинхронную библиотеку.Но я просто не могу обернуться, заставляя «набор результатов» работать как генератор (Узел 10.x).

Исходный API работает так:

api.prepare((err, rs) => {
    rs.fetchRows(
        (err, row) => {
            // this callback is called as many times as rows exist
            console.log("here's a row:", row);
        },
        () => {
            console.log("we're done, data exausted");
        }
    );
});

НоВот как я хочу его использовать:

const wrapped = new ApiWrapper(api);
const rs = await wrapped.prepare({});
for (let row of rs.rows()) {
    console.log("here's a row:", row);
}

let row;
while(row = await rs.next()) {
    console.log("here's a row:", row);
}

Я думал, что у меня это под контролем генераторов, но похоже, что вы не можете использовать yield внутри обратного вызова.Это на самом деле кажется логичным, если подумать.

class ApiWrapper {
    constructor(api) {
        this.api = api;
    }
    prepare() {
        return new Promise((resolve, reject) => {
            this.api.prepare((err, rs) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(rs);
                }
            });
        });
    }
    *rows() {
        this.api.fetchRows((err, row) => {
            if (err) {
                throw err;
            } else {
                yield row; // nope, not allowed here
            }
        });
    }
    next() { ... }
}

Итак, какие у меня есть альтернативы?

Важно : я не хочу ничего хранить в массивезатем повторим это, мы говорим о гигабайтах данных строк.

Edit Я могу смоделировать поведение, которое я хочу, используя stream.Readable, но оно предупреждает меня, что этоэкспериментальная особенность.Вот упрощенная версия проблемы, которую я пытаюсь решить, используя stream:

const stream = require('stream');
function gen(){
    const s = new stream.Readable({
        objectMode: true,
        read(){
            [11, 22, 33].forEach(row => {
                this.push({ value: row });
            });
            this.push(null)
        }
    });
    return s;
}

for await (let row of gen()) {
    console.log(row);
}

// { value: 11 }
// { value: 22 }
// { value: 33 }

(node:97157) ExperimentalWarning: Readable[Symbol.asyncIterator] is an experimental feature. This feature could change at any time

1 Ответ

0 голосов
/ 30 января 2019

Я наконец понял, что мне нужно что-то похожее на каналы Go, которые были async/await совместимы.По сути, ответ заключается в синхронизации асинхронного итератора и обратного вызова, заставляя их ждать друг друга по мере использования next() итераций.

Лучшее (Узел) собственное решение, которое я нашел, состояло в том, чтобыиспользуйте stream в качестве итератора, который поддерживается в Node 10.x, но помечен как экспериментальный.Я также пытался реализовать его с помощью модуля p-defer NPM, но это оказалось более сложным, чем я ожидал.Наконец, наткнулся на модуль https://www.npmjs.com/package/@nodeguy/channel, который был именно тем, что мне было нужно:

const Channel = require('@nodeguy/channel');

class ApiWrapper {
    // ...
    rows() {
        const channel = new Channel();
        const iter = {
            [Symbol.asyncIterator]() {
                return this;
            },
            async next() {
                const val = await channel.shift();
                if (val === undefined) {
                    return { done: true };
                } else {
                    return { done: false, value: val };
                }
            }
        };

        this.api.fetchRows(async (err, row) => {
            await channel.push(row);
        }).then(() => channel.close());

        return iter;
    }
}

// then later

for await (let row of rs.rows()) {
     console.log(row)
}

Обратите внимание, как каждое ядро ​​итерирующей функции, next() и rows(), имеет await, который будетзадайте, сколько данных может быть передано по каналу, иначе обратный вызов может закончиться бесконтрольным помещением данных в очередь канала.Идея состоит в том, что обратный вызов должен ждать, пока данные будут использованы итератором next(), прежде чем нажимать more.

Вот более самостоятельный пример:

const Channel = require('@nodeguy/channel');

function iterating() {
    const channel = Channel();

    const iter = {
        [Symbol.asyncIterator]() {
            return this;
        },
        async next() {
            console.log('next');
            const val = await channel.shift();
            if (val === undefined) {
                return { done: true };
            } else {
                return { done: false, value: val };
            }
        }
    };

    [11, 22, 33].forEach(async it => {
        await channel.push(it);
        console.log('pushed', it);
    });

    console.log('returned');
    return iter;
}

(async function main() {
    for await (let it of iterating()) {
        console.log('got', it);
    }
})();

/*
returned
next
pushed 11
got 11
next
pushed 22
got 22
next
pushed 33
got 33
next
*/

Как я уже сказал,Для этого можно использовать потоки и / или обещания, но модуль Channel решает некоторые сложности, делающие его более интуитивным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...