Как пропустить последовательность действий через несколько потоков? - PullRequest
1 голос
/ 20 апреля 2020

Я изучаю проблему, которая, вероятно, является частным случаем класса проблемы, но я не знаю ни класса проблемы, ни соответствующей терминологии, поэтому мне приходится прибегать к описанию проблемы с помощью ad-ho c словарный запас. Я перефразирую, как только узнаю правильную терминологию.

У меня есть куча синглетонов A, B, C. Синглтоны:

  • Не связаны. Нет ограничений, таких как «вы должны получить доступ к B, прежде чем сможете сделать X с C» или аналогичным.
  • Не поточно-ориентированный.

Система принимает задачи для обработки параллельно насколько это возможно.
Каждое задание состоит из последовательности действий, каждое из которых должно быть выполнено с использованием одного из синглетонов. Разные задачи могут обращаться к разным одноэлементным файлам в разном порядке, и задачи могут содержать циклы действий.

Псевдокод:

void myTask(in1, in2, ...) {
    doWithA(() -> {
       // use in1, in2, ...
       // inspect and/or update A
       // set up outputs to be used as inputs for the next action:
       outA1 = ...
       outA2 = ...
       ...
    });
    doWithB(() -> {
       // use outA1, outA2, ...
       // inspect and/or update B
       // set up outputs to be used as inputs for the next action:
       outB1 = ...
       outB2 = ...
       ...
    });
    // Tasks may touch singletons repeatedly, in any order
    doWithA(() -> {
       // outB1, outB2, ..., inspect/modify A, set up outputs
       outAx1 = ...
       outAx2 = ...
       ...
    });
    // Tasks may have loops:
    while (conditionInC(() -> ...) {
        doWithC(() -> ...);
        doWithD(() -> ...);
    }
    // I am aware that a loop like this can cause a livelock.
    // That's an aspect for another question, on another day.
}

Существует несколько задач, таких как myTask выше.
Задачи для быть выполненными, завернуты в замыкание и запланированы на ThreadPoolExecutor (или что-то подобное).

Подходы, которые я рассмотрел:

  1. Есть синглтоны LockA, LockB,. ..
    Каждый doWithX является просто блоком synchronized(X).
    OutXn являются локальными переменными myTask.
    Проблема : один из синглетонов - Swing, и Я не могу переместить EDT в поток, которым я управляю.
  2. Как указано выше. Решите проблему Swing из подхода (1), кодируя doWithSwing(){...} как SwingUtilities.invokeAndWait(() -> {...}.
    Задача : invokeAndWait обычно считается склонной к тупику. Как мне выяснить, не сталкиваюсь ли я с подобными проблемами с описанным выше шаблоном?
  3. Есть потоки threadA, threadB, ..., каждый из которых "владеет" одним из синглетонов (Swing уже имеет это, это EDT).
    doWithX планирует блок как Runnable на threadX.
    outXn установлены как Future<...> outXn = new SettableFuture<>(), назначения становятся outXn.set(...).
    Проблема : Я не смог найти ничего подобного SettableFuture в JDK; все способы создания Future, которые я мог найти, были так или иначе связаны с ThreadPool. Может быть, я смотрю на неправильный интерфейс верхнего уровня, и Future - это красная сельдь?

С этими подходами было бы лучше?
Есть ли такой превосходный подход, которого я не делал рассмотреть?

1 Ответ

2 голосов
/ 20 апреля 2020

Я не знаю ни класса проблемы, ни соответствующей терминологии

Возможно, я бы просто назвал класс проблемы одновременной оркестровкой задачи .

Есть много вещей, которые необходимо учитывать при определении правильного подхода. Если вы предоставите более подробную информацию, я постараюсь обновить свой ответ более насыщенным цветом.

Нет ограничений, таких как «вы должны получить доступ к B, прежде чем сможете сделать X с помощью C» или подобному .

Это вообще хорошая вещь. Очень частая причина взаимоблокировок - разные потоки, получающие одинаковые блокировки в разных порядках. Например, поток 1 блокирует A, а затем B, в то время как поток 2 владеет блокировкой B и ожидает получения A. Очень важно разработать решение, чтобы такая ситуация не возникала.

Я не смог найти что-нибудь вроде SettableFuture в JDK

Взгляните на java.util.concurrent.CompletableFuture<T> - это, вероятно, то, что вы хотите здесь. Он выставляет блокировку get(), а также ряд асинхронных обратных вызовов завершения, таких как thenAccept(Consumer<? super T>).

invokeAndWait, как правило, считается склонным к тупику

It зависит. Если ваш вызывающий поток не удерживает никаких блокировок, которые будут необходимы для выполнения отправляемого Runnable, вы, вероятно, в порядке. Тем не менее, если вы можете основывать свою оркестровку на асинхронных обратных вызовах, вы можете вместо этого использовать SwingUtilities.invokeLater(Runnable) - это представит выполнение вашего Runnable на событии Swing l oop без блокировки вызывающего потока.

Я бы, вероятно, избегал создания потока на синглтон. Каждый работающий поток вносит некоторые накладные расходы, и лучше отделить количество потоков от вашей бизнес-логики c. Это позволит вам настроить программное обеспечение на разные физические машины, например, на основе количества ядер.

Похоже, вам нужно, чтобы каждый метод runWithX(...) был atomi c , Другими словами, как только один поток начал обращаться к X, другой поток не сможет сделать это, пока первый поток не завершит свой шаг задачи. Если это так, то создание объекта блокировки на единицу и обеспечение доступа serial (а не параллельный ) - правильный путь к go. Вы можете достичь этого, поместив выполнение замыканий, переданных в ваши методы runWithX(...), в кодовый блок synchronized Java. Код в блоке также называется критическая секция или область монитора .

Другая вещь, которую следует учитывать, это конкуренция потоков и порядок исполнения. Если двум задачам требуется доступ к X, и задача 1 передается перед задачей 2, является ли требованием доступ задачи 1 к X раньше, чем задача 2? Подобное требование может немного усложнить конструкцию, и я, вероятно, рекомендую другой подход, чем описанный выше.

Есть ли какой-то превосходящий подход, который я не рассматривал?

В наши дни существуют рамки для решения подобных проблем. Я специально думаю о реактивных потоках и Rx Java. Хотя это очень мощная структура, она также имеет очень крутой курс обучения. Прежде чем внедрять такую ​​технологию в организации, необходимо провести много анализа и анализа.


Обновление:

Основываясь на ваших отзывах, я думаю * Подход на основе 1071 *, вероятно, имеет больше смысла.

Я бы создал вспомогательный класс для организации выполнения шага задачи:

class TaskHelper
{
    private final Object lockA;
    private final Object lockB;
    private final Object lockC;

    private final Executor poolExecutor;
    private final Executor swingExecutor;

    public TaskHelper()
    {
        poolExecutor = Executors.newFixedThreadPool( 2 );
        swingExecutor = SwingUtilities::invokeLater;

        lockA = new Object();
        lockB = new Object();
        lockC = new Object();
    }

    public <T> CompletableFuture<T> doWithA( Supplier<T> taskStep )
    {
        return doWith( lockA, poolExecutor, taskStep );
    }

    public <T> CompletableFuture<T> doWithB( Supplier<T> taskStep )
    {
        return doWith( lockB, poolExecutor, taskStep );
    }

    public <T> CompletableFuture<T> doWithC( Supplier<T> taskStep )
    {
        return doWith( lockC, swingExecutor, taskStep );
    }

    private <T> CompletableFuture<T> doWith( Object lock, Executor executor, Supplier<T> taskStep )
    {
        CompletableFuture<T> future = new CompletableFuture<>();

        Runnable serialTaskStep = () -> {

            T result;

            synchronized ( lock ) {
                result = taskStep.get();
            }

            future.complete( result );
        };

        executor.execute( serialTaskStep );
        return future;
    }
}

В моем примере выше withA и withB получить расписание для общего пула потоков, в то время как withC всегда выполняется в потоке Swing. Swing Executor уже будет последовательным по своей природе, поэтому блокировка там действительно необязательна.

Для создания реальных задач я бы рекомендовал создать объект для каждой задачи. Это позволяет вам предоставлять обратные вызовы в качестве ссылок на методы, что приводит к чистому коду и позволяет избежать ада обратного вызова:

callback hell

В этом примере вычисляется квадрат предоставленного числа в фоновом пуле потоков, а затем отображаются результаты в потоке Swing:

class SampleTask
{
    private final TaskHelper helper;
    private final String id;
    private final int startingValue;

    public SampleTask( TaskHelper helper, String id, int startingValue )
    {
        this.helper = helper;
        this.id = id;
        this.startingValue = startingValue;
    }

    private void start()
    {
        helper.doWithB( () -> {

            int square = startingValue * startingValue;
            return String.format( "computed-thread: %s computed-square: %d",
                    Thread.currentThread().getName(), square );
        } )
        .thenAccept( this::step2 );
    }

    private void step2( String result )
    {
        helper.doWithC( () -> {

            String message = String.format( "current-thread: %s task: %s result: %s",
                    Thread.currentThread().getName(), id, result );

            JOptionPane.showConfirmDialog( null, message );
            return null;
        } );
    }
}

@Test
public void testConcurrent() throws InterruptedException, ExecutionException
{
    TaskHelper helper = new TaskHelper();

    new SampleTask( helper, "task1", 5 ).start();
    new SampleTask( helper, "task2", 7 ).start();

    Thread.sleep( 60000 );
}

Обновление 2:

Если вы хотите избежать ада обратного вызова, в то же время избегая необходимости создавать объект для каждой задачи, возможно, вам стоит все-таки серьезно взглянуть на реактивные потоки.

Взгляните на «начало работы» "страница для Rx Java: https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava

Для справки вот как будет выглядеть тот же пример выше в Rx (для простоты я удаляю концепцию идентификатора задачи):

@Test
public void testConcurrentRx() throws InterruptedException
{
    Scheduler swingScheduler = Schedulers.from( SwingUtilities::invokeLater );
    Subject<Integer> inputSubject = PublishSubject.create();

    inputSubject
        .flatMap( input -> Observable.just( input )
                .subscribeOn( Schedulers.computation() )
                .map( this::computeSquare ))
        .observeOn( swingScheduler )
        .subscribe( this::displayResult );

    inputSubject.onNext( 5 );
    inputSubject.onNext( 7 );
    Thread.sleep( 60000 );
}

private String computeSquare( int input )
{
    int square = input * input;
    return String.format( "computed-thread: %s computed-square: %d",
            Thread.currentThread().getName(), square );
}

private void displayResult( String result )
{
    String message = String.format( "current-thread: %s result: %s",
            Thread.currentThread().getName(), result );

    JOptionPane.showConfirmDialog( null, message );
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...