Я безуспешно пытаюсь выполнить потоковую передачу всех узлов из neo4j graphdb при расчете их пути с использованием драйвера nodejs и neo4j js. Я использую два запроса, первый из которых выбирает все узлы узлов и использует наблюдатель onNext , затем я пытаюсь выполнить запрос, чтобы получить путь для каждого узла и отправить sh его в буфер. Проблема заключается в том, что буфер закрывается на onComplete первого запроса и не ожидает окончания кода, выполненного в onNext .
Ошибка [ERR_STREAM_PUSH_AFTER_EOF]: stream.pu sh () после EOF
Вот мои службы, которые передают данные в буфер:
async fetchSiteMap(): Promise<Readable> {
const transaction = this.session.getSession().beginTransaction();
const buffer = new stream.Readable();
buffer._read = () => {};
await transaction.run( `
MATCH (r:Category) RETURN r.code AS code
`)
.subscribe( {
onNext: async node => {
const result = await transaction.run(`MATCH path = (:Category {code: $code})<-[hasChild:HAS_CHILD*]-(rootNode:RootNode)
WITH rootNode, REVERSE(nodes(path)) as nodes, COLLECT(path) as paths, reduce(sum = 0, rel in hasChild | sum + rel.position) as weight
RETURN nodes
ORDER BY rootNode.position, SIZE(paths) asc, weight asc LIMIT 1`, {code: node.get( 'code' )});
const records: Record[] = get(result, 'records');
if (isEmpty(records)) {
return [];
}
const newRecords = records.map(record => record.get('nodes'));
const path = compact(newRecords[0].map((segment: { properties: GraphProperty }, index: number) => {
if (index === 0) {
return;
}
return CategoryNodePath.fromGraph(<NodeGraphProperty>segment.properties);
}));
buffer.push(JSON.stringify(path));
},
onCompleted: async () => {
buffer.push(null)
transaction.commit();
this.session.close();
},
onError: () => {
buffer.push(null);
},
} );
return buffer;
}