Выполнить несколько потоков параллельно и несколько последовательно с помощью CompletableFuture - PullRequest
6 голосов
/ 13 января 2020

Мне нужно выполнить несколько задач. Некоторые из задач являются независимыми, а некоторые зависят от успешного выполнения других задач. Независимые задачи могут выполняться параллельно для повышения производительности. Я называю эти задачи услугами. В столбце link указано, какие службы будут выполняться последовательно, а какие - параллельно. Столбец order описывает порядок выполнения, за которым следует набор определенных служб. Для приведенного ниже примера службы A и B должны работать параллельно. Если они были выполнены успешно, то служба C будет выполнена. Обратите внимание, что служба C не зависит напрямую от выходных данных своих предыдущих служб, но она должна работать после успешного выполнения своих предыдущих служб, поскольку службе C потребуются некоторые данные во время выполнения, созданные ее предыдущими службами. После успешного выполнения службы C будет запущена следующая служба D, и поэтому этот цикл будет продолжаться до тех пор, пока не будут использованы все службы в списке.

Tasks       service     link      order
Service A   01          03        1
Service B   02          03        2
Service C   03          04        3
Service D   04          05        4
Service E   05          07        5
Service F   06          07        6
Service G   07          (null)    7

Ниже приведен мой код.

    public void executeTransactionFlow(DataVo dataVo) throws Exception {

    List<Callable<Boolean>> threadList = new ArrayList<>();
    List<String> serviceIds = new ArrayList<>();
    List<Future<Boolean>> futureList;
    String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;

    // Iterating through service flows map
    for (Map<String, String> map : serviceFlowsMap) {
        joinTo = map.get("link");
        serviceId = map.get("service");

        // A simple flag to differentiate which services should execute parallel and which in serial.
        if (null == prevJoinTo) {
            prevJoinTo = joinTo;
        }

        // Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
        if (null != joinTo && joinTo.equals(prevJoinTo)) {
            threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
        }

        /*
         * 1. Run the threads in the list
         * 2. Empty the thread list
         * 3. Empty serviceIds list
         * 4. Set prevJoinTo
         */
        else {
            if (threadList.size() > 0) {
                prevJoinTo = joinTo;

                try {

                    // If list contain only 1 service then call, otherwise invokeAll
                    futureList = MyExecutor.executeServices(threadList, dataVo);

                    // During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
                    if (dataVo.isTimedout()) {
                        throw new Exception("Transaction thread is Interrupted or Timed-out");
                    }

                    // Validate service response codes and get decision in case of any failure
                    validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);

                    // If validationRespCode is non 00 then do not process further
                    if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
                        break;
                    }
                }
                catch (Exception e) {
                    throw new Exception(e.getMessage(), e);
                }
                finally {
                    // clear thread list and serviceIds list. It will be populated for next parallel set of threads
                    threadList.clear();
                    serviceIds.clear();
                }
            }

            // Start preparing new thread list
            // Adding current service_id into threadList after executing previous services in parallel.
            threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
        }
    }

    // Run remaining services
    if (!threadList.isEmpty()) {

        try {
            futureList = MyExecutor.executeServices(threadList, dataVo);
            validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
        }
        catch (Throwable e) {
            throw new Exception(e.getMessage(), e);
        }
    }

    // Check validation response code
    if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
        MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
    }

}


/**
 * This method iterates through the thread list and checks for exceptions and service responses.
 * If service response is not success or if any exception has occurred then exception is thrown
 */
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
    String finalResponse = "200", serviceResponse = null;

    /*
     * future list will be null if single service is executed (no other parallel transactions). The reason is that we do
     * not use invokeAll() on single service.
     */

    if (null != futureList && futureList.size() > 0) {
        for (Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch (Exception e) {
                throw new Exception(e.getMessage(), e);
            }
        }
    }

    // Iterate through serviceIds and check responses.
    for (String serviceId : serviceIds) {
        serviceResponse = dataVo.getServiceResponse(serviceId);

        /*
         * if one of following response is found then consider it exception
         */
        if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
            throw new Exception("One of the service has been declined");
        }
    }

    return finalResponse;
}

Если CompletableFuture может быть полезным здесь, то как я могу использовать это эффективно?

И future.get() является блокирующим вызовом. В случае, если у меня есть 10 служб, которые выполняются параллельно, тогда этот future.get() будет блокировать другие, даже если они выполнялись до текущего, который мы ожидаем. Как избежать этой блокировки?

Я добавил более подробную информацию о постановке задачи, то есть добавление столбца заказа. Услуги должны следовать определенному порядку. Порядок обслуживания A и B составляет 1 и 2 соответственно, но они все равно будут выполняться параллельно, поскольку оба имеют значение 03 в link. Я думаю, что подход, основанный на графиках зависимостей, теперь не потребуется, как предложено @Thomas в комментариях.

Ответы [ 6 ]

3 голосов
/ 13 января 2020

Удивительный вопрос. Хотя, технически, безусловно, это можно сделать, используя чисто ExecutorService и Future, на мой взгляд, лучшим способом будет использование реактивного программирования, а не зависимость исключительно от Future или CompletableFuture или CompletionService и подобное, аналогичное, похожее. Основная причина в том, что он может быстро стать трудно читаемым кодом.

Вот как я это сделал, используя RxJava 2.2.16 и ExecutorService:

  1. Выполнение действий, которые не зависит от других, или все их зависимости были завершены с помощью действий ExecutorService до submit().
  2. Чтобы узнать, что действие завершено, используйте BehaviorSubject из Rx Java. Когда действие завершено, запустите шаг (1) для каждой из его зависимостей.
  3. Отключите ExecutorService, когда все действия будут завершены. Для этого используйте другой BehaviorSubject.

Извините, я написал весь лог c по-своему из-за нового подхода. Но это все еще вокруг основного требования, данного вами. Будет хорошо сначала взглянуть на класс Action модели и метод createActions() в AppRxjava. оттуда вы сможете следовать коду. Чтобы смоделировать некоторое время, я использовал знаменитую технику Thread.sleep().

public class AppRxJava{
    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();

    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */
    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();

    private ExecutorService SVC = Executors.newCachedThreadPool();
    private List<Action> allActions;

    public static void main( String[] args ){
        new AppRxJava().start();
    }

    private void start() {
        this.allActions = createActions();
        subscribeToActionCompletions();
        subscribeToSvcShutdown();

        startAllActions( this.allActions );
    }

    private void subscribeToSvcShutdown(){
        /* If all actions have been completed, shut down the ExecutorService. */
        this.allActionCompletedSub.subscribe( allScheduled -> {
            if( allScheduled ) {
                SVC.shutdown();
                try {
                    SVC.awaitTermination( 2, TimeUnit.SECONDS );
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }

    private void subscribeToActionCompletions(){
        this.completionSub.subscribe( complAction -> {
            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
            List<Action> deps = getDeps( complAction, this.allActions );
            startAllActions( deps );

            /* If all actions have got completed, raise the flag. */
            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
        });
    }

    /* Attempts to start all actions that are present in the passed list. */
    private void startAllActions( List<Action> actions ){
        for( Action action : actions ) {
            startAction( action, actions );
        }
    }

    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
    private void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
                    SVC.submit( () -> {
                        try {
                            a.getAction().call();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }

                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
                        this.completionSub.onNext( a );
                    } );
                }
            }
        }
    }

    private boolean allActionsCompleted(){
        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
        return true;
    }

    private static boolean allDepsCompleted( Action a, List<Action> allActions ){
        for( Action dep : allActions ) {
            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
        }

        return true;
    }

    /* Returns the actions that are dependent on Action <code>a</code>. */
    private List<Action> getDeps( Action a, List<Action> list ){
        List<Action> deps = new ArrayList<>();
        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
        return deps;
    }

    /* Creates the action list with respective dependencies. */
    private List<Action> createActions(){
        List<Action> actions = new ArrayList<>();

        Action a = createAction( 5000, "ServiceA", null );
        Action b = createAction( 5000, "ServiceB", null );
        Action c = createAction( 2000, "ServiceC", a, b );
        Action d = createAction( 2000, "ServiceD", c );
        Action e = createAction( 2000, "ServiceE", d );

        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
        return actions;
    }

    private Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
        List<Action> deps = null;
        if( dependencies != null ) {
            deps = new ArrayList<>();
            for( Action a : dependencies ) deps.add( a );
        }
        return Action.of( () -> {
            System.out.println( "Service (" + name + ") started" );
            try {
                Thread.sleep( sleepMillis );
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println( "Service (" + name + ") completed" );
            return true;
        }, name, deps );
    }


}

и класс модели Action. Это представляет одно действие и список действий, от которых оно зависит. (Небольшое отличие от вашего первоначального представления. Но в любом случае все в порядке, если вы справитесь с этим, я думаю.)

public class Action{
    Callable<Boolean> action;
    String name;
    List<Action> dependencies = new ArrayList<>();
    AtomicInteger status = new AtomicInteger( 0 ); //0 = Pending, 1 = Scheduled, 2 = Completed
    public static final Object LOCK = new Object();

    private Action(Callable<Boolean> action, String name, List<Action> dependencies) {
        super();
        this.action = action;
        this.name = name;
        if( dependencies != null ) this.dependencies = dependencies;
    }

    public static Action of( Callable<Boolean> action, String name, List<Action> dependencies ){
        return new Action( action, name, dependencies );
    }

    public Callable<Boolean> getAction(){
        return action;
    }

    public String getName(){
        return name;
    }

    public List<Action> getDependencies(){
        return dependencies;
    }

    public boolean isCompleted(){
        return this.status.get() == 2;
    }

    public boolean isPending(){
        return this.status.get() == 0;
    }

    public boolean isScheduled(){
        return this.status.get() == 1;
    }

    public void setStatus( int status ){
        this.status.getAndSet( status );
    }

    @Override
    public int hashCode(){
        final int prime = 31;
        int result = 1;
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        return result;
    }

    @Override
    public boolean equals( Object obj ){
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        Action other = (Action) obj;
        if (name == null) {
            if (other.name != null)
                return false;
        } else if (!name.equalsIgnoreCase( other.name )) return false;
        return true;
    }

}
1 голос
/ 14 января 2020

Итак, одна простая вещь, которую вы можете сделать, - это обернуть ваши службы таким образом, чтобы сделать управление намного проще,

  • , предоставив имя
  • , ведя список предварительных условий для его запуска и
  • сообщает вам, когда завершится сам запуск службы (чтобы можно было проверить предварительные условия других служб)

Вот как может выглядеть такой класс:

class Service implements Runnable {

    private final Runnable wrappedRunnable;
    private final String name;
    private final List<String> preconditions = new CopyOnWriteArrayList<>();
    private final Consumer<String> finishedNotification;

    Service(Runnable r, String name, Consumer<String> finishedNotification, String... preconditions) {
        this.wrappedRunnable = r;
        this.name = name;
        this.finishedNotification = finishedNotification;
        this.preconditions.addAll(Arrays.asList(preconditions));
    }

    @Override
    public void run() {
        wrappedRunnable.run();
        finishedNotification.accept(name);
    }

    void preconditionFulfilled(String precondition) {
        preconditions.remove(precondition);
    }
    boolean arePreconditionsFulfilled() {
        return preconditions.isEmpty();
    }
}

Аргумент Runnable - это фактический сервисный вызов, который вы хотите обернуть, имя - это информация управления в вашей таблице («Служба A», или «01» или что вы хотите), а preconditions - это имена других сервисов, которые должны быть завершены до того, как это будет выполнено.

Теперь вам все еще нужен менеджер для ведения списка всех сервисов - об этом также необходимо уведомлять после завершения вызова сервиса , Это может быть довольно просто, если вести простой список сервисов.

class CallManager {
    List<Service> services = new ArrayList<>();
    ExecutorService executorService = Executors.newFixedThreadPool(4);

    void addService(Runnable r, String serviceName, String... preconditions) {
        services.add(new Service(r, serviceName, this::preconditionFulfilled, preconditions));
    }

    void run() {
        for (Iterator<Service> serviceIterator = services.iterator(); serviceIterator.hasNext(); ) {
            Service service = serviceIterator.next();
            if (service.arePreconditionsFulfilled()) {
                executorService.submit(service);
                serviceIterator.remove();
            }
        }
        if (services.isEmpty()) {
            executorService.shutdown();
        }
    }

    private synchronized void preconditionFulfilled(String name) {
        System.out.printf("service %s finished%n", name);
        for (Iterator<Service> serviceIterator = services.iterator(); serviceIterator.hasNext(); ) {
            Service service = serviceIterator.next();
            service.preconditionFulfilled(name);
            if (service.arePreconditionsFulfilled()) {
                executorService.submit(service);
                serviceIterator.remove();
            }
        }
        if (services.isEmpty()) {
            executorService.shutdown();
        }
    }
}

Итак, сначала вам нужно добавить все сервисы, которые вы хотите запустить, этому менеджеру, а затем вызвать run() на нем * 1028. * вся цепочка исполнения. Вот как это может выглядеть для вашего примера:

class Bootstrap {
    private static final Random RANDOM = new Random();
    public static void main(String[] args) {
        CallManager manager = new CallManager();
        manager.addService(simpleRunnable("A"), "A");
        manager.addService(simpleRunnable("B"), "B");
        manager.addService(simpleRunnable("C"), "C", "A", "B");
        manager.addService(simpleRunnable("D"), "D", "C");
        manager.addService(simpleRunnable("E"), "E", "D");
        manager.addService(simpleRunnable("F"), "F");
        manager.addService(simpleRunnable("G"), "G", "E", "F");

        manager.run();
    }

    // create some simple pseudo service
    private static Runnable simpleRunnable(String s) {
        return () -> {
            System.out.printf("running service %s%n", s);
            try {
                Thread.sleep(RANDOM.nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }
}

Вы можете увидеть это работает здесь .

1 голос
/ 14 января 2020

Просто не мог отвлечься от базового c вопроса о том, чтобы делать это с чистым Java. Итак, вот модифицированная версия моего более раннего ответа. Этот ответ содержит оба стиля - RxJava и ExecutorService. Он содержит 3 класса:

  1. DependentSeriesOfActionsBase : базовый класс, содержащий несколько повторно используемых методов и общие поля. Это просто для удобства и легкого понимания кода.
  2. DependentSeriesOfActionsCore Java: Это реализация на основе ExecutorService. Я использую Future.get() для ожидания результатов действия, с разница в том, что само ожидание происходит асинхронно . Взгляните на startAction().
  3. DependentSeriesOfActionsRx Java: более ранняя реализация на основе RxJava.

Code : DependentSeriesOfActionsBase

abstract class DependentSeriesOfActionsBase{
    protected List<Action> allActions;
    protected ExecutorService SVC = Executors.newCachedThreadPool();

    protected boolean allActionsCompleted(){
        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
        return true;
    }

    protected static boolean allDepsCompleted( Action a, List<Action> allActions ){
        for( Action dep : allActions ) {
            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
        }

        return true;
    }

    /* Returns the actions that are dependent on Action <code>a</code>. */
    protected List<Action> getDeps( Action a, List<Action> list ){
        List<Action> deps = new ArrayList<>();
        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
        return deps;
    }

    /* Creates the action list with respective dependencies. */
    protected List<Action> createActions(){
        List<Action> actions = new ArrayList<>();

        Action a = createAction( 5000, "ServiceA", null );
        Action b = createAction( 5000, "ServiceB", null );
        Action c = createAction( 2000, "ServiceC", a, b );
        Action d = createAction( 2000, "ServiceD", c );
        Action e = createAction( 2000, "ServiceE", d );

        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
        return actions;
    }

    protected Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
        List<Action> deps = null;
        if( dependencies != null ) {
            deps = new ArrayList<>();
            for( Action a : dependencies ) deps.add( a );
        }
        return Action.of( () -> {
            System.out.println( "Service (" + name + ") started" );
            try {
                Thread.sleep( sleepMillis );
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println( "Service (" + name + ") completed" );
            return true;
        }, name, deps );
    }

    /* Attempts to start all actions that are present in the passed list. */
    protected void startAllActions( List<Action> actions ){
        for( Action action : actions ) {
            startAction( action, actions );
        }
    }

    protected abstract void startAction( Action action, List<Action> actions );


    protected void shutdown(){
        SVC.shutdown();
        try {
            SVC.awaitTermination( 2, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

Код : DependentSeriesOfActionsCoreJava

public class DependentSeriesOfActionsCoreJava extends DependentSeriesOfActionsBase{
    public static void main( String[] args ){
        new DependentSeriesOfActionsCoreJava().start();
    }

    private void start() {
        this.allActions = createActions();
        startAllActions( this.allActions );
    }

    protected void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 

                    /* Submit the action to the ExecutorService and get the handle to the Future. */
                    final Future<?> fut = SVC.submit( () -> a.action.call() );

                    /* Submit the Future.get() action to the ExecutorService and execute the dependencies when it returns. */
                    SVC.submit( () -> {
                        Object returnVal = null;
                        /* Wait */
                        try {
                            fut.get(); 
                            a.setStatus( 2 );

                            /* If all actions are completed, shut down the ExecutorService. */
                            if( allActionsCompleted() ) shutdown();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }

                        startAllActions( getDeps( a, this.allActions ) );
                    } );

                }
            }
        }
    }
}

Код : DependentSeriesOfActionsRxJava

public class DependentSeriesOfActionsRxJava extends DependentSeriesOfActionsBase{
    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();

    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */
    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();

    public static void main( String[] args ){
        new DependentSeriesOfActionsRxJava().start();
    }

    private void start() {
        this.allActions = createActions();
        subscribeToActionCompletions();
        subscribeToSvcShutdown();

        startAllActions( this.allActions );
    }

    private void subscribeToSvcShutdown(){
        /* If all actions have been completed, shut down the ExecutorService. */
        this.allActionCompletedSub.subscribe( allScheduled -> {
            if( allScheduled ) shutdown();
        });
    }

    private void subscribeToActionCompletions(){
        this.completionSub.subscribe( complAction -> {
            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
            List<Action> deps = getDeps( complAction, this.allActions );
            startAllActions( deps );

            /* If all actions have got completed, raise the flag. */
            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
        });
    }

    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
    protected void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
                    SVC.submit( () -> {
                        try {
                            a.getAction().call();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }

                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
                        this.completionSub.onNext( a );
                    } );
                }
            }
        }
    }

}
1 голос
/ 13 января 2020

thenCombine может использоваться для express зависимостей между CompletionStages, позволяя вам выполнить задачу после выполнения обоих. Затем вы можете выполнить последующие действия с помощью thenApply :

CompletionStage<ServiceAResponse> serviceAResponse = callServiceA();
CompletionStage<ServiceBResponse> serviceBResponse = callServiceB();


CompletionStage<ServiceEResponse> result = serviceA.thenCombine(serviceBResponse, (aResponse, bResponse) -> serviceC.call(aResponse, bResponse))                                                     
                                         .thenApply(cResponse -> serviceD.call(cResponse))                                                    
                                         .thenApply(dResponse -> serviceE.call(eResponse))



public CompletionStage<ServiceAResponse> callServiceA() {
    return CompletableFuture.supplyAsync(() -> serviceA.call());
}

public CompletionStage<ServiceBResponse> callServiceB() {
    return CompletableFuture.supplyAsync(() -> serviceB.call());
}
0 голосов
/ 14 января 2020

вы не сказали, если для задачи C, которая выполняется после задач A и B, нужны результаты A и B. Предположим, что это так (если нет, решение будет еще короче). Тогда решение с использованием библиотеки DF4J может выглядеть примерно так:

Class A extends AsyncProc {
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       ...
       res.onSuccess(value);
   }
}
Class B extends AsyncProc {
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       ...
       res.onSuccess(value);
   }
}
Class C extends AsyncProc {
   InpScalar<T> param1 = new InpScalar<>(this);
   InpScalar<T> param2 = new InpScalar<>(this);
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       value = ... param1.current() ...param2.current()...
       res.onSuccess(value);
   }
}
Class D extends AsyncProc {
   InpScalar<T> param = new InpScalar<>(this);
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       value = ... param.current()
       res.onSuccess(value);
   }
}
Class E extends AsyncProc {
   InpScalar<T> param = new InpScalar<>(this);
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       value = ... param.current()
       res.onSuccess(value);
   }
}

Таким образом, мы объявили асинхронные процедуры с различным количеством параметров. Затем мы создаем экземпляры и соединяем их в графе потока данных:

 A a = new A(); a.start();
 B b = new A(); b.start();
 C c = new A(); c.start();
 D d = new A(); d.start();
 E e = new A(); e.start();
 a.res.subscribe(c.param1);
 b.res.subscribe(c.param2);
 c.res.subscribe(d.param);
 d.res.subscribe(e.param);

Наконец, дождемся результата последнего асинхронного c pro c синхронно:

 T result = e.res.get();
0 голосов
/ 13 января 2020

Я не прошел весь ваш код, поскольку он выглядит в основном как управление параллелизмом, но общая концепция, которую вы можете реализовать с помощью

CompletableFuture.allOf(CompletableFuture.runAsync(serviceA::run),
                        CompletableFuture.runAsync(serviceB::run))
                 .thenRun(serviceC::run)
                 .thenRun(serviceD::run)
                 .thenRun(serviceE::run);

Это создает зависимости в вашей таблице (при условии, что все службы реализовать Runnable); он не будет ждать завершения всего процесса до 100 * * (что можно сделать, добавив .join() или .get()).

...