Наблюдаемый с фиксированными максимальными задачами Парелла с использованием исполнителей - PullRequest
1 голос
/ 19 февраля 2020

Я хочу создать Observable, который может создавать ограниченное количество задач. Для этого я пытаюсь использовать службу Executor с блокирующей очередью указанного размера c. Я могу успешно проверить ожидаемое поведение исполнителя (он принимает максимум 8 параллельных задач. Контрольный пример не включен).

Но когда я пытаюсь использовать исполнителя, чтобы ограничить максимальное количество параллельных задач Я вижу, что этого не происходит. Тестовый код ниже показывает 100 немедленных звонков на s.exec(t).

Является ли это правильным способом ограничения наблюдаемых определенным числом максимальных задач?

Код пула потоков:

public class ThreadPool {

    public static int MIN_THREAD_POOL_SIZE = 1;
    public static int MAX_THREAD_POOL_SIZE = 2;
    public static int MAX_QUEUE_SIZE = 5;

    public static int SHUTDOWN_TIME = 30;

    // When the number of threads is greater than the core, this is the maximum time
    // that excess idle threads will wait for new tasks before terminating.
    public static int EXCESS_THREAD_KEEPALIVE_SEC = 30;

    // Rejected threads are run in the calling thread.
    // This blocks the calling thread from submitting more task.
    // So max tasks accepted will be (maxThreadPoolSize + maxQueueSize + 1)

    //Eagerly creating thread pool at startup
    public static BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(MAX_QUEUE_SIZE);
    public static ExecutorService executor = new ThreadPoolExecutor(MIN_THREAD_POOL_SIZE, MAX_THREAD_POOL_SIZE, EXCESS_THREAD_KEEPALIVE_SEC,
            TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
}

Наблюдаемый код

public void execute(Task t, Service s) {

    Observable.just(t.getId())
        .doOnNext(
            input -> {
              logStartEvent(externalTask);
            })
        .map(x -> { 
           Thread.sleep(1000)//sleeping to simulate a long call
           return s.exec(t);
         })     
        .subscribeOn(Schedulers.from(ThreadPool.executor))
        .subscribe(
            (response) -> {
              log.info(response.toString);
            },
            (error) -> {
                log.error(error.toString);
            }
    );

  }

Испытательный код

@Test
    public void testMutiThread() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            new Thread() {
                public void run() {
                    execute(externalTask, externalTaskService);
                }
            }.start();
        }
    }
...