Как реализовать очередь задач asyn c с несколькими одновременными работниками (asyn c) в dart - PullRequest
0 голосов
/ 13 июля 2020

Моя цель - создать своего рода веб-краулер на dart. Для этого я хочу поддерживать очередь задач, в которой хранятся элементы, которые необходимо сканировать (например, URL-адреса). Сканирование элементов выполняется с помощью функции сканирования, которая возвращает список дополнительных элементов, которые необходимо обработать. Таким образом, эти элементы добавляются в очередь. Пример кода:

import "dart:collection";
final queue = Queue<String>();
main() async{
  queue
    ..add("...")
    ..add("...")
    ..add("...");
  while (queue.isNotEmpty) {
    results = await crawl(queue.removeFirst());
    queue.addAll(results);
  }
}

Future<List<String>> crawl(String x) async {
  ...
  res = await http.get(x)
  ...
  return results;
}

Этот грубый код обрабатывает только один элемент за раз. Однако я хочу иметь пул рабочих (например, 5), которые берут элементы из очереди и обрабатывают их одновременно, а результаты возвращают в очередь. Поскольку узким местом является HTTP-запрос, я думаю, что вызов Future.wait () с несколькими рабочими может ускорить выполнение. Однако я не хочу перегружать серверы и, следовательно, я также хочу ограничить количество рабочих.

Можно ли это реализовать с помощью базовых c asyn c примитивов и семафоров? Я бы хотел по возможности избегать изолятов, чтобы решение было как можно проще.

1 Ответ

1 голос
/ 13 июля 2020

Я не знаю, есть ли там пакет, предоставляющий эту функциональность, но, поскольку не так сложно написать собственный лог c, я сделал следующий пример:

import 'dart:async';
import 'dart:collection';
import 'dart:math';

class TaskRunner<A, B> {
  final Queue<A> _input = Queue();
  final StreamController<B> _streamController = StreamController();
  final Future<B> Function(A) task;

  final int maxConcurrentTasks;
  int runningTasks = 0;

  TaskRunner(this.task, {this.maxConcurrentTasks = 5});

  Stream<B> get stream => _streamController.stream;

  void add(A value) {
    _input.add(value);
    _startExecution();
  }

  void addAll(Iterable<A> iterable) {
    _input.addAll(iterable);
    _startExecution();
  }

  void _startExecution() {
    if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
      return;
    }

    while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
      runningTasks++;
      print('Concurrent workers: $runningTasks');

      task(_input.removeFirst()).then((value) async {
        _streamController.add(value);

        while (_input.isNotEmpty) {
          _streamController.add(await task(_input.removeFirst()));
        }

        runningTasks--;
        print('Concurrent workers: $runningTasks');
      });
    }
  }
}

Random _rnd = Random();
Future<List<String>> crawl(String x) =>
    Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));

void main() {
  final runner = TaskRunner(crawl, maxConcurrentTasks: 3);

  runner.stream.forEach((listOfString) {
    if (listOfString.length == 1) {
      print('DONE: ${listOfString.first}');
    } else {
      print('PUTTING STRINGS ON QUEUE: $listOfString');
      runner.addAll(listOfString);
    }
  });

  runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}

Который выводы:

Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70

Я уверен, что удобство использования класса можно улучшить, но я думаю, что основная концепция достаточно проста для понимания. Концепция состоит в том, что мы определяем Queue, и каждый раз, когда мы добавляем что-то к этому Queue, мы проверяем, можем ли мы начать выполнение новых задач asyn c. В противном случае мы просто пропустим его, так как мы гарантируем, что каждая текущая запущенная задача asyn c будет проверять наличие большего количества содержимого на Queue перед «закрытием».

Результаты возвращаются Stream, которое вы может подписаться и, например, добавить больше контента в TaskRunner на основе результата, как я показываю в моем примере. Порядок, в котором возвращаются данные, зависит от порядка их завершения.

Важно, чтобы это НЕ способ выполнять задачи в нескольких потоках. Весь код выполняется в одном изолированном потоке Dart, но поскольку HTTP-запросы задерживаются на ввод-вывод, есть смысл в попытке создать несколько Future и дождаться результата.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...