У меня есть Api, который выполняет какую-то долгую задачу, поэтому я хочу использовать планировщики, которые могут выполнять задачи параллельно, но этот Api может вызываться одновременно несколькими пользователями, поэтому я хочу ограничить потоки из планировщика, каждый Api можетиспользовать. Я вроде как написал базовый код для этого, но так как он не был тщательно протестирован, я не уверен, есть ли какая-либо неожиданная ошибка, которую я мог бы сделать.
public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
Map<K,V> context, Task<T,S,K,V> someTask){
if(threadPoolExecutor==null){
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
}
if(someTask==null){
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
}
if(CollectionUtils.isEmpty(collection)){
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
}
LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
collection.forEach(value -> {
callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
});
LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();
int count = 0;
while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
futures.offer(f);
count++;
}
Collection<ResponseObject<T>> responseCollection = new ArrayList<>();
while(futures.size()>0){
Future<T> future = futures.poll();
ResponseObject<T> responseObject = null;
try {
T response = future.get(timeToCompleteEachTask, timeUnit);
responseObject = ResponseObject.<T>builder().data(response).build();
} catch (InterruptedException e) {
future.cancel(true);
} catch (ExecutionException e) {
future.cancel(true);
} catch (TimeoutException e) {
future.cancel(true);
} finally {
if (Objects.nonNull(responseObject)) {
responseCollection.add(responseObject);
}
futures.remove(future);//remove this
Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
if(null!=callable){
Future<T> f = threadPoolExecutor.submit(callable);
futures.add(f);
}
}
}
return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
}
private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
if(callableLinkedBlockingQueue.size()>0){
return callableLinkedBlockingQueue.poll();
}
return null;
}
Также я только что узнал оошибка в scheduleThreadPoolExecutor, касающаяся future.cancel (true), которая говорит о том, что даже после отмены используемых ресурсов памяти не освобождается до тех пор, пока не истечет срок задачи. ref: - проверить соответствующую ошибку в ScheduledThreadPoolExecutor Во-вторых, в худшем случае scenerio, когда каждая задача заблокирована, тогда эта функция будет выполнена в CollectionSize * timeToCompleteEachTask. Я не уверен, что эти проблемы повлияют на мою реализацию, если вы думаете о любой проблеме в этом коде, которая может возникнуть, пожалуйста, прокомментируйте здесь и предоставьте соответствующую ссылку, если это возможно.