РЕДАКТИРОВАТЬ: Это в основном вопрос «как правильно реализовать механизм потока данных в Java», и я чувствую, что на него нельзя ответить адекватно в одном ответе (это все равно, что спросить «как правильно реализовать слой ORM» и чтобы кто-то записал детали Hibernate или чего-то еще), поэтому сочтите этот вопрос закрытым.
Существует ли элегантный способ моделирования динамического потока данных в Java? Под потоком данных я подразумеваю, что существуют различные типы задач, и эти задачи могут быть произвольно «связаны», так что когда задача завершается, последующие задачи выполняются параллельно с использованием готовых задач, выводимых в качестве входных данных, или после завершения нескольких задач их задачи вывод агрегируется в последующей задаче (см. потоковое программирование ). Под динамическим я подразумеваю, что тип и число задач-преемников, когда задача завершается, зависит от вывода этой завершенной задачи, поэтому, например, задача A может порождать задачу B, если она имеет определенный результат, но может порождать задачу C, если имеет другой вывод. Другой способ выразиться в том, что каждая задача (или набор задач) отвечает за определение следующих задач.
Пример потока данных для рендеринга веб-страницы: У меня есть следующие типы задач: загрузчик файлов, средство визуализации HTML / CSS, средство синтаксического анализа HTML / DOM, средство визуализации изображений, средство синтаксического анализа JavaScript, интерпретатор JavaScript.
- Задача загрузки файла для файла HTML
- Задача HTML-парсера / построителя DOM
- Задача загрузки файла для каждого внедренного файла / ссылки
- Если изображение, рендерер изображений
- Если внешний JavaScript, парсер JavaScript
- В противном случае просто сохраните в каком-нибудь var / field в задании парсера HTML
- Парсер JavaScript для каждого встроенного скрипта
- Дождитесь завершения вышеуказанных задач, затем средство рендеринга HTML / CSS (очевидно, не оптимальное или не совсем правильное, но это просто)
Я не говорю, что решение должно быть какой-то всеобъемлющей структурой (на самом деле, чем ближе к JDK API, тем лучше), и я абсолютно не хочу что-то более тяжелое, скажем Spring Web Flow или некоторую декларативную разметку или другой DSL.
Если говорить более конкретно, я пытаюсь придумать хороший способ смоделировать это в Java с помощью Callables, Executors, ExecutorCompletionServices и, возможно, различных классов синхронизаторов (таких как Semaphore или CountDownLatch). Есть пара вариантов использования и требований:
- Не делайте никаких предположений о том, на каких исполнителях будут выполняться задачи. На самом деле, чтобы упростить, предположим, что есть только один исполнитель. Это может быть фиксированный исполнитель пула потоков, поэтому наивная реализация может привести к взаимоблокировкам (например, представьте задачу, которая отправляет другую задачу и затем блокирует ее до завершения этой подзадачи, а теперь представьте, что некоторые из этих задач используют все потоки).
- Для упрощения предположим, что данные не передаются между задачами (вывод задачи-> ввод последующей задачи) - завершающая задача и последующая задача не должны существовать вместе, поэтому входные данные для следующей задачи не будут изменено предыдущим заданием (поскольку оно уже выполнено).
- Существует только пара операций, которые должен обрабатывать «движок» потока данных:
- Механизм, в котором задача может поставить в очередь больше задач
- Механизм, посредством которого последующая задача не ставится в очередь до тех пор, пока все необходимые входные задачи не будут завершены
- Механизм, посредством которого основной поток (или другие потоки, не управляемые исполнителем) блокируются до завершения потока
- Механизм, посредством которого основной поток (или другие потоки, не управляемые исполнителем) блокируется до тех пор, пока определенные задачи не будут завершены
- Поскольку поток данных является динамическим (зависит от ввода / состояния задачи), активация этих механизмов должна происходить в коде задачи, например, код в Callable сам отвечает за организацию очередей в Callable.
- Поток данных "внутренности" не должен подвергаться самим задачам (Callables) - только задачи, перечисленные выше, должны быть доступны для задачи.
- Обратите внимание, что тип данных не обязательно одинаков для всех задач, например задача загрузки файла может принять файл в качестве входных данных, но выведет строку.
- Если задача выдает необработанное исключение (указывающее на неустранимую ошибку, требующую остановки всей обработки потока данных), она должна распространиться до потока, который инициировал поток данных, как можно быстрее и отменить все задачи (или что-то более необычное, например, фатальная ошибка обработчик).
- Задачи должны быть запущены как можно скорее. Это вместе с предыдущим требованием должно исключать простой опрос в будущем + Thread.sleep ().
- В качестве бонуса я хотел бы, чтобы сам механизм обработки данных выполнял какое-то действие (например, ведение журнала) каждый раз, когда задача завершена или когда не было завершено в X раз с момента завершения последней задачи. Что-то вроде:
ExecutorCompletionService<T> ecs; while (hasTasks()) { Future<T> future = ecs.poll(1 minute); some_action_like_logging(); if (future != null) { future.get() ... } ... }
Существуют ли простые способы сделать все это с помощью API параллелизма Java? Или, если это будет сложно, независимо от того, что доступно в JDK, есть ли легкая библиотека, которая удовлетворяет требованиям? У меня уже есть частичное решение, которое подходит для моего конкретного случая использования (оно обманывает, так как я использую двух исполнителей, и, как вы знаете, оно вообще не связано с примером веб-браузера, который я привел выше), но я Хотелось бы увидеть более универсальное и элегантное решение.