Методы моделирования динамического потока данных с API параллелизма Java - PullRequest
1 голос
/ 19 мая 2010

РЕДАКТИРОВАТЬ: Это в основном вопрос «как правильно реализовать механизм потока данных в Java», и я чувствую, что на него нельзя ответить адекватно в одном ответе (это все равно, что спросить «как правильно реализовать слой ORM» и чтобы кто-то записал детали Hibernate или чего-то еще), поэтому сочтите этот вопрос закрытым.

Существует ли элегантный способ моделирования динамического потока данных в Java? Под потоком данных я подразумеваю, что существуют различные типы задач, и эти задачи могут быть произвольно «связаны», так что когда задача завершается, последующие задачи выполняются параллельно с использованием готовых задач, выводимых в качестве входных данных, или после завершения нескольких задач их задачи вывод агрегируется в последующей задаче (см. потоковое программирование ). Под динамическим я подразумеваю, что тип и число задач-преемников, когда задача завершается, зависит от вывода этой завершенной задачи, поэтому, например, задача A может порождать задачу B, если она имеет определенный результат, но может порождать задачу C, если имеет другой вывод. Другой способ выразиться в том, что каждая задача (или набор задач) отвечает за определение следующих задач.

Пример потока данных для рендеринга веб-страницы: У меня есть следующие типы задач: загрузчик файлов, средство визуализации HTML / CSS, средство синтаксического анализа HTML / DOM, средство визуализации изображений, средство синтаксического анализа JavaScript, интерпретатор JavaScript.

  • Задача загрузки файла для файла HTML
    • Задача HTML-парсера / построителя DOM
      • Задача загрузки файла для каждого внедренного файла / ссылки
        • Если изображение, рендерер изображений
        • Если внешний JavaScript, парсер JavaScript
          • Интерпретатор JavaScript
        • В противном случае просто сохраните в каком-нибудь var / field в задании парсера HTML
      • Парсер JavaScript для каждого встроенного скрипта
        • Интерпретатор JavaScript
      • Дождитесь завершения вышеуказанных задач, затем средство рендеринга HTML / CSS (очевидно, не оптимальное или не совсем правильное, но это просто)

Я не говорю, что решение должно быть какой-то всеобъемлющей структурой (на самом деле, чем ближе к JDK API, тем лучше), и я абсолютно не хочу что-то более тяжелое, скажем Spring Web Flow или некоторую декларативную разметку или другой DSL.

Если говорить более конкретно, я пытаюсь придумать хороший способ смоделировать это в Java с помощью Callables, Executors, ExecutorCompletionServices и, возможно, различных классов синхронизаторов (таких как Semaphore или CountDownLatch). Есть пара вариантов использования и требований:

  1. Не делайте никаких предположений о том, на каких исполнителях будут выполняться задачи. На самом деле, чтобы упростить, предположим, что есть только один исполнитель. Это может быть фиксированный исполнитель пула потоков, поэтому наивная реализация может привести к взаимоблокировкам (например, представьте задачу, которая отправляет другую задачу и затем блокирует ее до завершения этой подзадачи, а теперь представьте, что некоторые из этих задач используют все потоки).
  2. Для упрощения предположим, что данные не передаются между задачами (вывод задачи-> ввод последующей задачи) - завершающая задача и последующая задача не должны существовать вместе, поэтому входные данные для следующей задачи не будут изменено предыдущим заданием (поскольку оно уже выполнено).
  3. Существует только пара операций, которые должен обрабатывать «движок» потока данных:
    1. Механизм, в котором задача может поставить в очередь больше задач
    2. Механизм, посредством которого последующая задача не ставится в очередь до тех пор, пока все необходимые входные задачи не будут завершены
    3. Механизм, посредством которого основной поток (или другие потоки, не управляемые исполнителем) блокируются до завершения потока
    4. Механизм, посредством которого основной поток (или другие потоки, не управляемые исполнителем) блокируется до тех пор, пока определенные задачи не будут завершены
  4. Поскольку поток данных является динамическим (зависит от ввода / состояния задачи), активация этих механизмов должна происходить в коде задачи, например, код в Callable сам отвечает за организацию очередей в Callable.
  5. Поток данных "внутренности" не должен подвергаться самим задачам (Callables) - только задачи, перечисленные выше, должны быть доступны для задачи.
  6. Обратите внимание, что тип данных не обязательно одинаков для всех задач, например задача загрузки файла может принять файл в качестве входных данных, но выведет строку.
  7. Если задача выдает необработанное исключение (указывающее на неустранимую ошибку, требующую остановки всей обработки потока данных), она должна распространиться до потока, который инициировал поток данных, как можно быстрее и отменить все задачи (или что-то более необычное, например, фатальная ошибка обработчик).
  8. Задачи должны быть запущены как можно скорее. Это вместе с предыдущим требованием должно исключать простой опрос в будущем + Thread.sleep ().
  9. В качестве бонуса я хотел бы, чтобы сам механизм обработки данных выполнял какое-то действие (например, ведение журнала) каждый раз, когда задача завершена или когда не было завершено в X раз с момента завершения последней задачи. Что-то вроде: ExecutorCompletionService<T> ecs; while (hasTasks()) { Future<T> future = ecs.poll(1 minute); some_action_like_logging(); if (future != null) { future.get() ... } ... }

Существуют ли простые способы сделать все это с помощью API параллелизма Java? Или, если это будет сложно, независимо от того, что доступно в JDK, есть ли легкая библиотека, которая удовлетворяет требованиям? У меня уже есть частичное решение, которое подходит для моего конкретного случая использования (оно обманывает, так как я использую двух исполнителей, и, как вы знаете, оно вообще не связано с примером веб-браузера, который я привел выше), но я Хотелось бы увидеть более универсальное и элегантное решение.

Ответы [ 2 ]

1 голос
/ 19 мая 2010

Как насчет определения интерфейса, такого как:

interface Task extends Callable {
  boolean isReady();
}

Тогда вашему «механизму потока данных» нужно будет просто управлять коллекцией объектов «Задачи», то есть разрешить очереди новых задач ставиться в очередь на исключение и разрешать запросы относительно состояния данной задачи (так что, возможно, интерфейс выше должен быть расширен для включения идентификатор и / или тип). Когда задача завершается (и, когда движок запускается, конечно), движок должен просто запросить любые незапущенные задачи, чтобы увидеть, готовы ли они сейчас, и, если да, передать их для запуска исполнителю. Как вы упомянули, любое ведение журнала и т. Д. Также может быть сделано тогда.

Еще одна вещь, которая может помочь, - это использовать Guice (http://code.google.com/p/google-guice/) или подобный облегченный DI-фреймворк, чтобы помочь правильно связать все объекты (например, чтобы убедиться, что создан правильный тип исполнителя, и убедиться, что Задачи, которым необходим доступ к механизму потока данных (например, для метода isReady или для постановки в очередь других задач), могут быть предоставлены с экземпляром без введения сложных циклических связей.

HTH, но, пожалуйста, прокомментируйте, если я пропустил какие-либо ключевые аспекты ... Пол.

0 голосов
/ 22 сентября 2011

Посмотрите на https://github.com/rfqu/df4j - простая, но мощная библиотека потоков данных.Если в нем отсутствуют некоторые желаемые функции, их можно легко добавить.

...