Как получить масштабируемую многопоточность asyn c для ввода-вывода из Java SDK или аналогичного SDK (ie: rx Java, проектный реактор)? - PullRequest
0 голосов
/ 02 марта 2020

Хочу

Множество потоков, которые будут выполнять вызов и блокировку базы данных для улучшения и масштабирования производительности.

Проблемы:

  1. Стандарт Java завершаемый в будущем API плохо работает с задачами блокировки / ввода-вывода, даже при использовании ManagedBlocker.
  2. Если вы используете библиотеку, у которой нет этой проблемы, слишком много асин c запросов на в то же время существует как минимум 1 проблема с масштабированием:
    • Слишком много потоков , созданных одновременно, может привести к ошибке нехватки памяти из-за того, сколько памяти требуется каждому потоку. И нет хороших по умолчанию ThreadPoolExecutors, которые позволяют устанавливать параметры пула потоков, такие как максимальное количество потоков, а затем предоставлять систему очереди для входящих задач, ожидающих, пока поток не станет доступным.

Пример

Я хочу масштабировать программу, которая должна будет выполнить 3000 асин c дБ запросов. Вместо того, чтобы делать 3000 запросов одновременно, я хочу ограничить его до 50 в любой момент времени и поставить в очередь оставшиеся 2950, ​​а затем обрабатывать каждый 2950 из оставшихся за раз, когда задача завершается. В идеале я хотел бы сделать это, используя существующие библиотеки, чтобы заново изобрести его с новым пользовательским кодом, поскольку я предполагаю, что есть способ сделать это, но я не уверен, как использовать API различных asyn c Java SDK, которые продолжают выходить.

1 Ответ

0 голосов
/ 11 марта 2020

Я думаю, что есть несколько способов обращения к неограниченному пулу потоков. Одним из них является, как указывают другие, создание планировщика Rx Java из исполнителя, поддерживаемого ограниченным пулом потоков. Это довольно просто и вполне может быть лучшим подходом.

Но я хочу отметить, что операторы "распараллеливания" Rx Java (flatMap, concatMapEager) также имеют дополнительный оператор maxConcurrency, который позволяет нам отделить число дорожек в данном конвейере Rx от планировщика, используемого для его выполнения.

Вот гипотетический пример, скажем, у нас есть объект доступа к данным, который выполняет блокирующие запросы. В этом случае он просто спит в течение 1 секунды и возвращает сам запрос с добавленной отметкой времени:

public class MyDao
{
    public Object blockingGetData( String query ) throws InterruptedException
    {
        Thread.sleep( 1000 );
        return query.toUpperCase() + " - " + new Date().toString();
    }
}

Далее, давайте обернем DAO в асинхронную службу c, которая поддерживает конвейер Rx, где каждый элемент представляет запрос и его асин c результат:

public class MyService
{
    private class QueryHolder
    {
        final String query;
        final Subject<Object> result;

        public QueryHolder( String query, Subject<Object> result )
        {
            this.query = query;
            this.result = result;
        }
    }

    private static final int MAX_CONCURRENCY = 2;
    private final Subject<QueryHolder> querySubject;
    private final MyDao dao;

    public MyService()
    {
        dao = new MyDao();
        querySubject = PublishSubject.<QueryHolder>create().toSerialized();

        querySubject
            .flatMap(
                    // For each element in the pipeline, perform blocking
                    // get on IO Scheduler, populating the result Subject:
                    queryHolder -> Observable.just( queryHolder )
                        .subscribeOn( Schedulers.io() )
                        .doOnNext( __ -> {
                            Object data = dao.blockingGetData( queryHolder.query );
                            queryHolder.result.onNext( data );
                            queryHolder.result.onComplete();
                        } ),
                    // With max concurrency limited:
                    MAX_CONCURRENCY )
            .subscribe();
    }

    public Single<Object> getData( String query )
    {
        Subject<Object> result = AsyncSubject.create();

        // Emit pipeline element:
        querySubject.onNext( new QueryHolder( query, result ));

        return result.firstOrError();
    }
}

Я рекомендую Google различные типы и операторы тем и т. д., c. - доступно множество документов.

Простой ручной тест:

@Test
public void testService() throws InterruptedException
{
    MyService service = new MyService();

    // Issue 20 queries immediately, printing the results when they complete:
    for ( int i = 0; i < 20; i++ )
    {
        service.getData( "query #" + i )
            .subscribe( System.out::println );
    }

    // Sleep:
    Thread.sleep( 11000 );
}

Вывод:

QUERY #0 - Wed Mar 11 11:08:21 EDT 2020
QUERY #1 - Wed Mar 11 11:08:21 EDT 2020
QUERY #2 - Wed Mar 11 11:08:22 EDT 2020
QUERY #3 - Wed Mar 11 11:08:22 EDT 2020
QUERY #4 - Wed Mar 11 11:08:23 EDT 2020
QUERY #5 - Wed Mar 11 11:08:23 EDT 2020
QUERY #6 - Wed Mar 11 11:08:24 EDT 2020
QUERY #7 - Wed Mar 11 11:08:24 EDT 2020
QUERY #8 - Wed Mar 11 11:08:25 EDT 2020
QUERY #9 - Wed Mar 11 11:08:25 EDT 2020
QUERY #10 - Wed Mar 11 11:08:26 EDT 2020
QUERY #11 - Wed Mar 11 11:08:26 EDT 2020
QUERY #12 - Wed Mar 11 11:08:27 EDT 2020
QUERY #13 - Wed Mar 11 11:08:27 EDT 2020
QUERY #14 - Wed Mar 11 11:08:28 EDT 2020
QUERY #15 - Wed Mar 11 11:08:28 EDT 2020
QUERY #16 - Wed Mar 11 11:08:29 EDT 2020
QUERY #17 - Wed Mar 11 11:08:29 EDT 2020
QUERY #18 - Wed Mar 11 11:08:30 EDT 2020
QUERY #19 - Wed Mar 11 11:08:30 EDT 2020
...