Почему IoScheduler, использующий ScheduledExecutorService с poolCoreSize, равен 1? - PullRequest
0 голосов
/ 20 июня 2019

Я обнаружил, что IoScheduler.createWorker () немедленно создаст NewThreadWorker, если нет кэшированного NewThreadWorker , Это может привести к OutOfMemoryError.

Если я добавлю 1000 единиц работы в IoScheduler единовременно ,, это создаст 1000 единиц NewThreadWorker и ScheduledExecutorService.

private void submitWorkers(int workerCount) {
    for (int i = 0; i < workerCount; i++) {
        Single.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return "String-call(): " + Thread.currentThread().hashCode();
            }
        })
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        // TODO
                    }
                });
    }
}

Если я установил workerCount с 1000, я получил OutOfMemoryError , Я хочу знать, почему IoScheduler использует NewThreadWorker с ScheduledExecutorService, но просто выполняет одну работу。 Каждый раз, когда приходит новая работа, она создает NewThreadWorker и ScheduledExecutorService, если нет кэшированного NewThreadWorker , Почему он предназначен для такого процесса?

1 Ответ

1 голос
/ 20 июня 2019

Каждый из стандартных рабочих RxJava использует выделенный поток, чтобы избежать чрезмерного скачкообразного изменения потока и миграции работы в потоках.

Стандартный планировщик ввода-вывода использует неограниченное количество рабочих потоков, поскольку его основное назначение - разрешить блокирующим операциям блокировать рабочий поток, в то время как другие операции могут начинаться в других рабочих потоках.Отличие от newThread заключается в том, что после повторного использования рабочего процесса во внутреннем пуле разрешено повторное использование потока.

Если бы количество потоков было ограничено, это резко увеличило бы вероятность взаимоблокировок из-за ресурсаистощение.Кроме того, в отличие от планировщика вычислений, для этого предела нет подходящего номера по умолчанию: 1, 10, 100, 1000?

Существует несколько способов решения этой проблемы, например:

  • используйте Schedulers.from() с произвольным ExecutorService, который вы можете ограничить и настроить по своему усмотрению,
  • используйте ParallelScheduler из проекта Extensions иопределить произвольно большой, но фиксированный пул работников.
...