Мне нужно выполнить несколько задач. Некоторые из задач являются независимыми, а некоторые зависят от успешного выполнения других задач. Независимые задачи могут выполняться параллельно для повышения производительности. Я называю эти задачи услугами. В столбце link
указано, какие службы будут выполняться последовательно, а какие - параллельно. Столбец order
описывает порядок выполнения, за которым следует набор определенных служб. Для приведенного ниже примера службы A и B должны работать параллельно. Если они были выполнены успешно, то служба C будет выполнена. Обратите внимание, что служба C не зависит напрямую от выходных данных своих предыдущих служб, но она должна работать после успешного выполнения своих предыдущих служб, поскольку службе C потребуются некоторые данные во время выполнения, созданные ее предыдущими службами. После успешного выполнения службы C будет запущена следующая служба D, и поэтому этот цикл будет продолжаться до тех пор, пока не будут использованы все службы в списке.
Tasks service link order
Service A 01 03 1
Service B 02 03 2
Service C 03 04 3
Service D 04 05 4
Service E 05 07 5
Service F 06 07 6
Service G 07 (null) 7
Ниже приведен мой код.
public void executeTransactionFlow(DataVo dataVo) throws Exception {
List<Callable<Boolean>> threadList = new ArrayList<>();
List<String> serviceIds = new ArrayList<>();
List<Future<Boolean>> futureList;
String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;
// Iterating through service flows map
for (Map<String, String> map : serviceFlowsMap) {
joinTo = map.get("link");
serviceId = map.get("service");
// A simple flag to differentiate which services should execute parallel and which in serial.
if (null == prevJoinTo) {
prevJoinTo = joinTo;
}
// Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
if (null != joinTo && joinTo.equals(prevJoinTo)) {
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
/*
* 1. Run the threads in the list
* 2. Empty the thread list
* 3. Empty serviceIds list
* 4. Set prevJoinTo
*/
else {
if (threadList.size() > 0) {
prevJoinTo = joinTo;
try {
// If list contain only 1 service then call, otherwise invokeAll
futureList = MyExecutor.executeServices(threadList, dataVo);
// During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
if (dataVo.isTimedout()) {
throw new Exception("Transaction thread is Interrupted or Timed-out");
}
// Validate service response codes and get decision in case of any failure
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
// If validationRespCode is non 00 then do not process further
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
break;
}
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
finally {
// clear thread list and serviceIds list. It will be populated for next parallel set of threads
threadList.clear();
serviceIds.clear();
}
}
// Start preparing new thread list
// Adding current service_id into threadList after executing previous services in parallel.
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
}
// Run remaining services
if (!threadList.isEmpty()) {
try {
futureList = MyExecutor.executeServices(threadList, dataVo);
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
}
catch (Throwable e) {
throw new Exception(e.getMessage(), e);
}
}
// Check validation response code
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
}
}
/**
* This method iterates through the thread list and checks for exceptions and service responses.
* If service response is not success or if any exception has occurred then exception is thrown
*/
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
String finalResponse = "200", serviceResponse = null;
/*
* future list will be null if single service is executed (no other parallel transactions). The reason is that we do
* not use invokeAll() on single service.
*/
if (null != futureList && futureList.size() > 0) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
}
}
// Iterate through serviceIds and check responses.
for (String serviceId : serviceIds) {
serviceResponse = dataVo.getServiceResponse(serviceId);
/*
* if one of following response is found then consider it exception
*/
if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
throw new Exception("One of the service has been declined");
}
}
return finalResponse;
}
Если CompletableFuture
может быть полезным здесь, то как я могу использовать это эффективно?
И future.get()
является блокирующим вызовом. В случае, если у меня есть 10 служб, которые выполняются параллельно, тогда этот future.get()
будет блокировать другие, даже если они выполнялись до текущего, который мы ожидаем. Как избежать этой блокировки?
Я добавил более подробную информацию о постановке задачи, то есть добавление столбца заказа. Услуги должны следовать определенному порядку. Порядок обслуживания A и B составляет 1 и 2 соответственно, но они все равно будут выполняться параллельно, поскольку оба имеют значение 03
в link
. Я думаю, что подход, основанный на графиках зависимостей, теперь не потребуется, как предложено @Thomas в комментариях.