я пытаюсь ограничить ни один из потоков планировщика, которые могут быть использованы API - PullRequest
0 голосов
/ 13 октября 2019

У меня есть Api, который выполняет какую-то долгую задачу, поэтому я хочу использовать планировщики, которые могут выполнять задачи параллельно, но этот Api может вызываться одновременно несколькими пользователями, поэтому я хочу ограничить потоки из планировщика, каждый Api можетиспользовать. Я вроде как написал базовый код для этого, но так как он не был тщательно протестирован, я не уверен, есть ли какая-либо неожиданная ошибка, которую я мог бы сделать.

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }
  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

Также я только что узнал оошибка в scheduleThreadPoolExecutor, касающаяся future.cancel (true), которая говорит о том, что даже после отмены используемых ресурсов памяти не освобождается до тех пор, пока не истечет срок задачи. ref: - проверить соответствующую ошибку в ScheduledThreadPoolExecutor Во-вторых, в худшем случае scenerio, когда каждая задача заблокирована, тогда эта функция будет выполнена в CollectionSize * timeToCompleteEachTask. Я не уверен, что эти проблемы повлияют на мою реализацию, если вы думаете о любой проблеме в этом коде, которая может возникнуть, пожалуйста, прокомментируйте здесь и предоставьте соответствующую ссылку, если это возможно.

...