Как отклонить вызовы, если пользовательская очередь пула потоков заполнена? - PullRequest
0 голосов
/ 29 августа 2018

Как с помощью Play Framework 2.5 заставить пользовательский пул потоков отклонять задачи, если пул и очередь полностью заняты?

Использование следующей конфигурации стандартных и пользовательских пулов потоков.

akka {
  actor {
    default-dispatcher {
      throughput=1
      executor="thread-pool-executor"
      thread-pool-executor {
        fixed-pool-size=off
        core-pool-size=5
        max-pool-size=10
        task-queue-size=5
      }
    }
  }
}

contexts {
  custom {
    executor = "thread-pool-executor"
    throughput = 1
    thread-pool-executor {
      fixed-pool-size=10
      task-queue-size=1
    }
  }
}

И следующий контроллер:

import akka.actor.ActorSystem;
import play.Logger;
import play.mvc.Result;
import scala.concurrent.ExecutionContextExecutor;

import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import static play.mvc.Results.ok;

public class TestController {
  private final ExecutionContextExecutor executor;

  @Inject
  public TestController(ActorSystem actorSystem) {
    this.executor = actorSystem.dispatchers().lookup("contexts.custom");
  }

  public CompletionStage<Result> call() {
    return CompletableFuture.supplyAsync(this::task, executor);
  }

  private Result task() {
    Logger.info("Task started");
    sleep(5000);
    Logger.info("----- Task completed ----- ");
    return ok("ok");
  }

  private static void sleep(long millis) {
    try {
      Thread.sleep(millis);
    } catch (Exception e) {

    }
  }
}

Я ожидаю, что после 10 запросов они начнут отклоняться пользовательским пулом потоков, но вместо этого они выполняются в пуле по умолчанию.

[application-contexts.custom-14] INFO  application - Task started
[application-contexts.custom-16] INFO  application - Task started
[application-contexts.custom-15] INFO  application - Task started
[application-contexts.custom-20] INFO  application - Task started
[application-contexts.custom-19] INFO  application - Task started
[application-contexts.custom-17] INFO  application - Task started
[application-contexts.custom-21] INFO  application - Task started
[application-contexts.custom-18] INFO  application - Task started
[application-contexts.custom-22] INFO  application - Task started
[application-contexts.custom-23] INFO  application - Task started
[application-akka.actor.default-dispatcher-7] INFO  application - Task started
[application-akka.actor.default-dispatcher-10] INFO  application - Task started
[application-akka.actor.default-dispatcher-6] INFO  application - Task started
[application-akka.actor.default-dispatcher-9] INFO  application - Task started
[application-akka.actor.default-dispatcher-4] INFO  application - Task started
[application-akka.actor.default-dispatcher-12] INFO  application - Task started
[application-akka.actor.default-dispatcher-13] INFO  application - Task started
[application-akka.actor.default-dispatcher-2] INFO  application - Task started
[application-akka.actor.default-dispatcher-8] INFO  application - Task started
[application-akka.actor.default-dispatcher-5] INFO  application - Task started
[application-akka.actor.default-dispatcher-3] INFO  application - Task started
[application-akka.actor.default-dispatcher-11] INFO  application - Task started

Это означает, что пул по умолчанию будет занят и не будет обрабатывать другие запросы, пока не выполнит все тяжелые задачи. Я хочу, чтобы мой пользовательский пул отклонял задачи, если пул заполнен, а очередь заполнена. Это возможно?

1 Ответ

0 голосов
/ 03 сентября 2018

Вы можете создать свой контекст выполнения программно с желаемой логикой, например, с помощью ExecutionContext.fromExecutor(new ThreadPoolExecutor(...)). Вы также можете реализовать свой собственный akka.dispatch.ExecutorServiceConfigurator и указать его полное имя класса в конфигурации: здесь документы

my-thread-pool-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "<Class name of the configurator>"

}
...