ИМХО, я бы попытался использовать мощь rxjs, поскольку мы все равно используем его здесь и избегаем реализации пользовательской концепции организации очереди, как это предлагается в другом ответе (хотя вы, конечно, можете это сделать).
Если мы немного упростим данный случай, у нас просто есть некоторые наблюдаемые и мы хотим выполнить длительную процедуру для каждого излучения - в последовательности . rxjs позволяет сделать это с помощью оператора concatMap
, по сути, из коробки:
$data.pipe(concatMap(item => processItem(item))).subscribe();
Это только предполагает, что processItem
возвращает наблюдаемое. Поскольку вы использовали await
, я предполагаю, что ваши функции в настоящий момент возвращают обещания. Их можно легко преобразовать в наблюдаемые с помощью from
.
Единственная деталь, которую осталось рассмотреть из OP, - это то, что наблюдаемое фактически испускает массив элементов, и мы хотим выполнить операцию с каждым элементом каждого излучения. Чтобы сделать это, мы просто выравниваем наблюдаемое, используя mergeMap
.
Давайте сложим все это вместе. Обратите внимание, что если вы уберете подготовку некоторых данных-заглушек и ведение журнала, фактическая реализация этого будет всего лишь двумя строками кода (с использованием mergeMap + concatMap).
const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;
// Stub for the long-running operation
function processTask(task) {
console.log("Processing task: ", task);
return new Promise(resolve => {
setTimeout(() => {
console.log("Finished task: ", task);
resolve(task);
}, 500 * Math.random() + 300);
});
}
// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));
// Some stubbed data stream
const tasks$ = interval(250).pipe(
take(9),
bufferCount(3),
);
tasks$.pipe(
tap(task => console.log("Received task: ", task)),
// Flatten the tasks array since we want to work in sequence anyway
mergeMap(tasks => tasks),
// Process each task, but do so consecutively
concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>