Я пытаюсь построить эффективный конвейер асинхронной обработки в Node.js для массива из потенциально десятков тысяч элементов. Конвейер начинается с вызова веб-API (с использованием пакета node-fetch), проходит несколько этапов синтаксического анализа / преобразования, а затем завершается добавлением файла на диске.
Однако существуют некоторые требования, которые в совокупности оказываются затрудняющими это:
В веб-API разрешено ограниченное количество запросов в минуту, поэтому я должен иметь возможность регулировать / устанавливать задержку между каждым начальным fetch
вызовом. Это означает, что эта стадия должна быть асинхронно последовательной.
Все результаты записываются в один и тот же файл и должны добавляться в том же порядке, который указан в исходном массиве, поэтому этот этап также должен быть последовательным.
В противном случае для общей производительности все должно работать максимально параллельно. Примеры:
а. Более ранние элементы должны быть в состоянии обрабатываться, включая этап записи в файл (при условии, что требование 2 выполнено), в то же время, когда более поздние элементы еще не были извлечены (из-за регулирования в точке 1).
б. Более поздние элементы должны быть отложены только на более ранние элементы (например, если у них особенно большое тело ответа API или особенно большое время анализа) на последнем этапе записи файла (чтобы удовлетворить требование 2). Для промежуточных шагов не должно быть зависимости порядка между элементами.
Следует отметить, что я использую Node 10, поэтому у меня есть асинхронные итераторы / for await
. Моя ближайшая попытка выглядит примерно так (представьте, что это в контексте асинхронной функции):
const valuePromises = [];
const delaySequence = itemArray.reduce(async (sequence, item) => {
await sequence;
const valuePromise = fetch(item.url)
.then(step1)
.then(step2)
.then(step3);
valuePromises.push(valuePromise);
return sleep(1000); // promisified setTimeout
}, Promise.resolve());
// If I don't do this the valuePromises array won't be fully populated:
await delaySequence;
for await (const value of valuePromises) {
await appendToFile(value);
}
Это работает за исключением того, что оно нарушает пункт "a" выше, потому что оно должно ждать, пока все выборки не будут запущены, прежде чем он сможет начать добавление в файл.
Я пытался поиграть с асинхронными генераторами, но не смог придумать ничего лучшего.
Я думал об использовании потоков, которые кажутся подходящими для такой задачи; они решат проблему порядка (сначала во-первых, а потом) и позволят некоторую степень параллелизма. Однако у них есть ограничение, заключающееся в том, что элемент не может пройти промежуточную стадию в конвейере раньше, чем предыдущий элемент, что нарушает «b». Я также не знаю, насколько легко заставить потоки взаимодействовать с API на основе обещаний.
Кто-нибудь знает, как этого можно достичь?