Я хочу создать Observable, который может создавать ограниченное количество задач. Для этого я пытаюсь использовать службу Executor с блокирующей очередью указанного размера c. Я могу успешно проверить ожидаемое поведение исполнителя (он принимает максимум 8 параллельных задач. Контрольный пример не включен).
Но когда я пытаюсь использовать исполнителя, чтобы ограничить максимальное количество параллельных задач Я вижу, что этого не происходит. Тестовый код ниже показывает 100 немедленных звонков на s.exec(t)
.
Является ли это правильным способом ограничения наблюдаемых определенным числом максимальных задач?
Код пула потоков:
public class ThreadPool {
public static int MIN_THREAD_POOL_SIZE = 1;
public static int MAX_THREAD_POOL_SIZE = 2;
public static int MAX_QUEUE_SIZE = 5;
public static int SHUTDOWN_TIME = 30;
// When the number of threads is greater than the core, this is the maximum time
// that excess idle threads will wait for new tasks before terminating.
public static int EXCESS_THREAD_KEEPALIVE_SEC = 30;
// Rejected threads are run in the calling thread.
// This blocks the calling thread from submitting more task.
// So max tasks accepted will be (maxThreadPoolSize + maxQueueSize + 1)
//Eagerly creating thread pool at startup
public static BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(MAX_QUEUE_SIZE);
public static ExecutorService executor = new ThreadPoolExecutor(MIN_THREAD_POOL_SIZE, MAX_THREAD_POOL_SIZE, EXCESS_THREAD_KEEPALIVE_SEC,
TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
}
Наблюдаемый код
public void execute(Task t, Service s) {
Observable.just(t.getId())
.doOnNext(
input -> {
logStartEvent(externalTask);
})
.map(x -> {
Thread.sleep(1000)//sleeping to simulate a long call
return s.exec(t);
})
.subscribeOn(Schedulers.from(ThreadPool.executor))
.subscribe(
(response) -> {
log.info(response.toString);
},
(error) -> {
log.error(error.toString);
}
);
}
Испытательный код
@Test
public void testMutiThread() throws InterruptedException {
for (int i = 0; i < 100; i++) {
new Thread() {
public void run() {
execute(externalTask, externalTaskService);
}
}.start();
}
}