Я не знаю ни класса проблемы, ни соответствующей терминологии
Возможно, я бы просто назвал класс проблемы одновременной оркестровкой задачи .
Есть много вещей, которые необходимо учитывать при определении правильного подхода. Если вы предоставите более подробную информацию, я постараюсь обновить свой ответ более насыщенным цветом.
Нет ограничений, таких как «вы должны получить доступ к B, прежде чем сможете сделать X с помощью C» или подобному .
Это вообще хорошая вещь. Очень частая причина взаимоблокировок - разные потоки, получающие одинаковые блокировки в разных порядках. Например, поток 1 блокирует A, а затем B, в то время как поток 2 владеет блокировкой B и ожидает получения A. Очень важно разработать решение, чтобы такая ситуация не возникала.
Я не смог найти что-нибудь вроде SettableFuture
в JDK
Взгляните на java.util.concurrent.CompletableFuture<T>
- это, вероятно, то, что вы хотите здесь. Он выставляет блокировку get()
, а также ряд асинхронных обратных вызовов завершения, таких как thenAccept(Consumer<? super T>)
.
invokeAndWait
, как правило, считается склонным к тупику
It зависит. Если ваш вызывающий поток не удерживает никаких блокировок, которые будут необходимы для выполнения отправляемого Runnable
, вы, вероятно, в порядке. Тем не менее, если вы можете основывать свою оркестровку на асинхронных обратных вызовах, вы можете вместо этого использовать SwingUtilities.invokeLater(Runnable)
- это представит выполнение вашего Runnable
на событии Swing l oop без блокировки вызывающего потока.
Я бы, вероятно, избегал создания потока на синглтон. Каждый работающий поток вносит некоторые накладные расходы, и лучше отделить количество потоков от вашей бизнес-логики c. Это позволит вам настроить программное обеспечение на разные физические машины, например, на основе количества ядер.
Похоже, вам нужно, чтобы каждый метод runWithX(...)
был atomi c , Другими словами, как только один поток начал обращаться к X, другой поток не сможет сделать это, пока первый поток не завершит свой шаг задачи. Если это так, то создание объекта блокировки на единицу и обеспечение доступа serial (а не параллельный ) - правильный путь к go. Вы можете достичь этого, поместив выполнение замыканий, переданных в ваши методы runWithX(...)
, в кодовый блок synchronized
Java. Код в блоке также называется критическая секция или область монитора .
Другая вещь, которую следует учитывать, это конкуренция потоков и порядок исполнения. Если двум задачам требуется доступ к X, и задача 1 передается перед задачей 2, является ли требованием доступ задачи 1 к X раньше, чем задача 2? Подобное требование может немного усложнить конструкцию, и я, вероятно, рекомендую другой подход, чем описанный выше.
Есть ли какой-то превосходящий подход, который я не рассматривал?
В наши дни существуют рамки для решения подобных проблем. Я специально думаю о реактивных потоках и Rx Java. Хотя это очень мощная структура, она также имеет очень крутой курс обучения. Прежде чем внедрять такую технологию в организации, необходимо провести много анализа и анализа.
Обновление:
Основываясь на ваших отзывах, я думаю * Подход на основе 1071 *, вероятно, имеет больше смысла.
Я бы создал вспомогательный класс для организации выполнения шага задачи:
class TaskHelper
{
private final Object lockA;
private final Object lockB;
private final Object lockC;
private final Executor poolExecutor;
private final Executor swingExecutor;
public TaskHelper()
{
poolExecutor = Executors.newFixedThreadPool( 2 );
swingExecutor = SwingUtilities::invokeLater;
lockA = new Object();
lockB = new Object();
lockC = new Object();
}
public <T> CompletableFuture<T> doWithA( Supplier<T> taskStep )
{
return doWith( lockA, poolExecutor, taskStep );
}
public <T> CompletableFuture<T> doWithB( Supplier<T> taskStep )
{
return doWith( lockB, poolExecutor, taskStep );
}
public <T> CompletableFuture<T> doWithC( Supplier<T> taskStep )
{
return doWith( lockC, swingExecutor, taskStep );
}
private <T> CompletableFuture<T> doWith( Object lock, Executor executor, Supplier<T> taskStep )
{
CompletableFuture<T> future = new CompletableFuture<>();
Runnable serialTaskStep = () -> {
T result;
synchronized ( lock ) {
result = taskStep.get();
}
future.complete( result );
};
executor.execute( serialTaskStep );
return future;
}
}
В моем примере выше withA
и withB
получить расписание для общего пула потоков, в то время как withC
всегда выполняется в потоке Swing. Swing Executor
уже будет последовательным по своей природе, поэтому блокировка там действительно необязательна.
Для создания реальных задач я бы рекомендовал создать объект для каждой задачи. Это позволяет вам предоставлять обратные вызовы в качестве ссылок на методы, что приводит к чистому коду и позволяет избежать ада обратного вызова:
В этом примере вычисляется квадрат предоставленного числа в фоновом пуле потоков, а затем отображаются результаты в потоке Swing:
class SampleTask
{
private final TaskHelper helper;
private final String id;
private final int startingValue;
public SampleTask( TaskHelper helper, String id, int startingValue )
{
this.helper = helper;
this.id = id;
this.startingValue = startingValue;
}
private void start()
{
helper.doWithB( () -> {
int square = startingValue * startingValue;
return String.format( "computed-thread: %s computed-square: %d",
Thread.currentThread().getName(), square );
} )
.thenAccept( this::step2 );
}
private void step2( String result )
{
helper.doWithC( () -> {
String message = String.format( "current-thread: %s task: %s result: %s",
Thread.currentThread().getName(), id, result );
JOptionPane.showConfirmDialog( null, message );
return null;
} );
}
}
@Test
public void testConcurrent() throws InterruptedException, ExecutionException
{
TaskHelper helper = new TaskHelper();
new SampleTask( helper, "task1", 5 ).start();
new SampleTask( helper, "task2", 7 ).start();
Thread.sleep( 60000 );
}
Обновление 2:
Если вы хотите избежать ада обратного вызова, в то же время избегая необходимости создавать объект для каждой задачи, возможно, вам стоит все-таки серьезно взглянуть на реактивные потоки.
Взгляните на «начало работы» "страница для Rx Java: https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava
Для справки вот как будет выглядеть тот же пример выше в Rx (для простоты я удаляю концепцию идентификатора задачи):
@Test
public void testConcurrentRx() throws InterruptedException
{
Scheduler swingScheduler = Schedulers.from( SwingUtilities::invokeLater );
Subject<Integer> inputSubject = PublishSubject.create();
inputSubject
.flatMap( input -> Observable.just( input )
.subscribeOn( Schedulers.computation() )
.map( this::computeSquare ))
.observeOn( swingScheduler )
.subscribe( this::displayResult );
inputSubject.onNext( 5 );
inputSubject.onNext( 7 );
Thread.sleep( 60000 );
}
private String computeSquare( int input )
{
int square = input * input;
return String.format( "computed-thread: %s computed-square: %d",
Thread.currentThread().getName(), square );
}
private void displayResult( String result )
{
String message = String.format( "current-thread: %s result: %s",
Thread.currentThread().getName(), result );
JOptionPane.showConfirmDialog( null, message );
}