Обертывание запросов Neo4j с потоками и в многопоточном приложении - PullRequest
0 голосов
/ 05 июля 2019

Я довольно потерян с некоторым сложным кодом, который комбинирует запросы Neo4j (драйвер Java) с потоками.

Проблема в том, что у меня есть метод, подобный этому, в некоторый низкоуровневый компонент:

    public <T> Stream<T> queryToStream ( Function<CypherClient, Stream<T>> action )
    {
        CypherClient client = this.newClient ();
        try {
            client.begin ();
            return action
                .apply ( client )
                .onClose ( () -> { if ( client.isOpen () ) client.close (); } );
        }
        catch ( RuntimeException ex ) {
            if ( client != null && client.isOpen () ) client.close ();
            throw ex;
        }
    }

this.newClient() оборачивает новый сеанс Neo4j в мой объект, client.begin() начинает Neo4jтранзакция (только для чтения), client.close () закрывает как транзакцию, так и сеанс.

Исходный код здесь и здесь .

Я пытаюсь использовать метод с этим действием, определенным в клиенте:

    protected Stream<Record> queryToStream ( String query, Value params )
    {
        this.checkOpen ();
        StatementResult cursor = params == null 
            ? this.tx.run ( query )
            : this.tx.run ( query, params )             
        ;

        Spliterator<Record> splitr = spliteratorUnknownSize ( cursor, Spliterator.IMMUTABLE );
        return StreamSupport.stream ( splitr, false );      
    }

, который я вызываю косвенно, следующим образом:

// findPaths() uses client.queryToStream() and wraps the result 
// into another stream that maps Neo4j Record objects to a stream
// of ONDEXEntity objects
Stream<List<ONDEXEntity> paths = cyProvider.queryToStream (
                    cyClient -> cyClient.findPaths ( ... "MATCH path= ... RETURN path LIMIT $l SKIP n", cypherParams )
                );
    ...
    paths.close();

Итак, мойИдея состоит в том, что я возвращаю поток с нижнего уровня, чтобы лениво сканировать результаты запроса, а затем закрываю транзакцию Neo4j, отправляя paths.close(), который перехватывается подчеркивающим методом onClose().Вызывается здесь (да, тем самым я упрощаю довольно более сложный код).

Проблема в том, что я выполняю несколько запросов параллельно, и вскоре я получаю сообщение об ошибке вроде: В данный момент нет доступных потоков для обслуживания этого запроса .

Интересно:

  • , если я все делаю правильно,
  • , если описанный выше подход можно упростить (в основном речь идет о параллельных запросах + обтекание потока).Я читал об асинхронных функциях , но не очень хорошо понял.
  • если мне нужно контролировать степень параллелизма (сколько запросов нужно отправить до того, как какой-либо из них завершится и завершит свою транзакцию) и, если да, как это можно сделать (т. Е. Как узнать, что такоеэтот лимит для сервера?).
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...