Я думаю, что есть несколько способов обращения к неограниченному пулу потоков. Одним из них является, как указывают другие, создание планировщика Rx Java из исполнителя, поддерживаемого ограниченным пулом потоков. Это довольно просто и вполне может быть лучшим подходом.
Но я хочу отметить, что операторы "распараллеливания" Rx Java (flatMap, concatMapEager) также имеют дополнительный оператор maxConcurrency, который позволяет нам отделить число дорожек в данном конвейере Rx от планировщика, используемого для его выполнения.
Вот гипотетический пример, скажем, у нас есть объект доступа к данным, который выполняет блокирующие запросы. В этом случае он просто спит в течение 1 секунды и возвращает сам запрос с добавленной отметкой времени:
public class MyDao
{
public Object blockingGetData( String query ) throws InterruptedException
{
Thread.sleep( 1000 );
return query.toUpperCase() + " - " + new Date().toString();
}
}
Далее, давайте обернем DAO в асинхронную службу c, которая поддерживает конвейер Rx, где каждый элемент представляет запрос и его асин c результат:
public class MyService
{
private class QueryHolder
{
final String query;
final Subject<Object> result;
public QueryHolder( String query, Subject<Object> result )
{
this.query = query;
this.result = result;
}
}
private static final int MAX_CONCURRENCY = 2;
private final Subject<QueryHolder> querySubject;
private final MyDao dao;
public MyService()
{
dao = new MyDao();
querySubject = PublishSubject.<QueryHolder>create().toSerialized();
querySubject
.flatMap(
// For each element in the pipeline, perform blocking
// get on IO Scheduler, populating the result Subject:
queryHolder -> Observable.just( queryHolder )
.subscribeOn( Schedulers.io() )
.doOnNext( __ -> {
Object data = dao.blockingGetData( queryHolder.query );
queryHolder.result.onNext( data );
queryHolder.result.onComplete();
} ),
// With max concurrency limited:
MAX_CONCURRENCY )
.subscribe();
}
public Single<Object> getData( String query )
{
Subject<Object> result = AsyncSubject.create();
// Emit pipeline element:
querySubject.onNext( new QueryHolder( query, result ));
return result.firstOrError();
}
}
Я рекомендую Google различные типы и операторы тем и т. д., c. - доступно множество документов.
Простой ручной тест:
@Test
public void testService() throws InterruptedException
{
MyService service = new MyService();
// Issue 20 queries immediately, printing the results when they complete:
for ( int i = 0; i < 20; i++ )
{
service.getData( "query #" + i )
.subscribe( System.out::println );
}
// Sleep:
Thread.sleep( 11000 );
}
Вывод:
QUERY #0 - Wed Mar 11 11:08:21 EDT 2020
QUERY #1 - Wed Mar 11 11:08:21 EDT 2020
QUERY #2 - Wed Mar 11 11:08:22 EDT 2020
QUERY #3 - Wed Mar 11 11:08:22 EDT 2020
QUERY #4 - Wed Mar 11 11:08:23 EDT 2020
QUERY #5 - Wed Mar 11 11:08:23 EDT 2020
QUERY #6 - Wed Mar 11 11:08:24 EDT 2020
QUERY #7 - Wed Mar 11 11:08:24 EDT 2020
QUERY #8 - Wed Mar 11 11:08:25 EDT 2020
QUERY #9 - Wed Mar 11 11:08:25 EDT 2020
QUERY #10 - Wed Mar 11 11:08:26 EDT 2020
QUERY #11 - Wed Mar 11 11:08:26 EDT 2020
QUERY #12 - Wed Mar 11 11:08:27 EDT 2020
QUERY #13 - Wed Mar 11 11:08:27 EDT 2020
QUERY #14 - Wed Mar 11 11:08:28 EDT 2020
QUERY #15 - Wed Mar 11 11:08:28 EDT 2020
QUERY #16 - Wed Mar 11 11:08:29 EDT 2020
QUERY #17 - Wed Mar 11 11:08:29 EDT 2020
QUERY #18 - Wed Mar 11 11:08:30 EDT 2020
QUERY #19 - Wed Mar 11 11:08:30 EDT 2020