Я не знаю, есть ли там пакет, предоставляющий эту функциональность, но, поскольку не так сложно написать собственный лог c, я сделал следующий пример:
import 'dart:async';
import 'dart:collection';
import 'dart:math';
class TaskRunner<A, B> {
final Queue<A> _input = Queue();
final StreamController<B> _streamController = StreamController();
final Future<B> Function(A) task;
final int maxConcurrentTasks;
int runningTasks = 0;
TaskRunner(this.task, {this.maxConcurrentTasks = 5});
Stream<B> get stream => _streamController.stream;
void add(A value) {
_input.add(value);
_startExecution();
}
void addAll(Iterable<A> iterable) {
_input.addAll(iterable);
_startExecution();
}
void _startExecution() {
if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
return;
}
while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
runningTasks++;
print('Concurrent workers: $runningTasks');
task(_input.removeFirst()).then((value) async {
_streamController.add(value);
while (_input.isNotEmpty) {
_streamController.add(await task(_input.removeFirst()));
}
runningTasks--;
print('Concurrent workers: $runningTasks');
});
}
}
}
Random _rnd = Random();
Future<List<String>> crawl(String x) =>
Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));
void main() {
final runner = TaskRunner(crawl, maxConcurrentTasks: 3);
runner.stream.forEach((listOfString) {
if (listOfString.length == 1) {
print('DONE: ${listOfString.first}');
} else {
print('PUTTING STRINGS ON QUEUE: $listOfString');
runner.addAll(listOfString);
}
});
runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}
Который выводы:
Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70
Я уверен, что удобство использования класса можно улучшить, но я думаю, что основная концепция достаточно проста для понимания. Концепция состоит в том, что мы определяем Queue
, и каждый раз, когда мы добавляем что-то к этому Queue
, мы проверяем, можем ли мы начать выполнение новых задач asyn c. В противном случае мы просто пропустим его, так как мы гарантируем, что каждая текущая запущенная задача asyn c будет проверять наличие большего количества содержимого на Queue
перед «закрытием».
Результаты возвращаются Stream
, которое вы может подписаться и, например, добавить больше контента в TaskRunner
на основе результата, как я показываю в моем примере. Порядок, в котором возвращаются данные, зависит от порядка их завершения.
Важно, чтобы это НЕ способ выполнять задачи в нескольких потоках. Весь код выполняется в одном изолированном потоке Dart, но поскольку HTTP-запросы задерживаются на ввод-вывод, есть смысл в попытке создать несколько Future
и дождаться результата.