Я довольно потерян с некоторым сложным кодом, который комбинирует запросы Neo4j (драйвер Java) с потоками.
Проблема в том, что у меня есть метод, подобный этому, в некоторый низкоуровневый компонент:
public <T> Stream<T> queryToStream ( Function<CypherClient, Stream<T>> action )
{
CypherClient client = this.newClient ();
try {
client.begin ();
return action
.apply ( client )
.onClose ( () -> { if ( client.isOpen () ) client.close (); } );
}
catch ( RuntimeException ex ) {
if ( client != null && client.isOpen () ) client.close ();
throw ex;
}
}
this.newClient()
оборачивает новый сеанс Neo4j в мой объект, client.begin()
начинает Neo4jтранзакция (только для чтения), client.close () закрывает как транзакцию, так и сеанс.
Исходный код здесь и здесь .
Я пытаюсь использовать метод с этим действием, определенным в клиенте:
protected Stream<Record> queryToStream ( String query, Value params )
{
this.checkOpen ();
StatementResult cursor = params == null
? this.tx.run ( query )
: this.tx.run ( query, params )
;
Spliterator<Record> splitr = spliteratorUnknownSize ( cursor, Spliterator.IMMUTABLE );
return StreamSupport.stream ( splitr, false );
}
, который я вызываю косвенно, следующим образом:
// findPaths() uses client.queryToStream() and wraps the result
// into another stream that maps Neo4j Record objects to a stream
// of ONDEXEntity objects
Stream<List<ONDEXEntity> paths = cyProvider.queryToStream (
cyClient -> cyClient.findPaths ( ... "MATCH path= ... RETURN path LIMIT $l SKIP n", cypherParams )
);
...
paths.close();
Итак, мойИдея состоит в том, что я возвращаю поток с нижнего уровня, чтобы лениво сканировать результаты запроса, а затем закрываю транзакцию Neo4j, отправляя paths.close()
, который перехватывается подчеркивающим методом onClose()
.Вызывается здесь (да, тем самым я упрощаю довольно более сложный код).
Проблема в том, что я выполняю несколько запросов параллельно, и вскоре я получаю сообщение об ошибке вроде: В данный момент нет доступных потоков для обслуживания этого запроса .
Интересно:
- , если я все делаю правильно,
- , если описанный выше подход можно упростить (в основном речь идет о параллельных запросах + обтекание потока).Я читал об асинхронных функциях , но не очень хорошо понял.
- если мне нужно контролировать степень параллелизма (сколько запросов нужно отправить до того, как какой-либо из них завершится и завершит свою транзакцию) и, если да, как это можно сделать (т. Е. Как узнать, что такоеэтот лимит для сервера?).