Результат запроса потока neo4j в той же транзакции - PullRequest
0 голосов
/ 09 января 2020

Я безуспешно пытаюсь выполнить потоковую передачу всех узлов из neo4j graphdb при расчете их пути с использованием драйвера nodejs и neo4j js. Я использую два запроса, первый из которых выбирает все узлы узлов и использует наблюдатель onNext , затем я пытаюсь выполнить запрос, чтобы получить путь для каждого узла и отправить sh его в буфер. Проблема заключается в том, что буфер закрывается на onComplete первого запроса и не ожидает окончания кода, выполненного в onNext .

Ошибка [ERR_STREAM_PUSH_AFTER_EOF]: stream.pu sh () после EOF

Вот мои службы, которые передают данные в буфер:

async fetchSiteMap(): Promise<Readable> {
const transaction = this.session.getSession().beginTransaction();
const buffer = new stream.Readable();
buffer._read = () => {};
await transaction.run( `
  MATCH (r:Category) RETURN r.code AS code
  `)
  .subscribe( {
    onNext: async node => {
      const result = await transaction.run(`MATCH path = (:Category {code: $code})<-[hasChild:HAS_CHILD*]-(rootNode:RootNode)
          WITH rootNode, REVERSE(nodes(path)) as nodes, COLLECT(path) as paths, reduce(sum = 0, rel in hasChild | sum + rel.position) as weight
          RETURN nodes
          ORDER BY rootNode.position, SIZE(paths) asc, weight asc LIMIT 1`, {code: node.get( 'code' )});

      const records: Record[] = get(result, 'records');
      if (isEmpty(records)) {
        return [];
      }

      const newRecords = records.map(record => record.get('nodes'));

      const path = compact(newRecords[0].map((segment: { properties: GraphProperty }, index: number) => {
        if (index === 0) {
          return;
        }

        return CategoryNodePath.fromGraph(<NodeGraphProperty>segment.properties);
      }));

      buffer.push(JSON.stringify(path));
    },
    onCompleted: async () => {
      buffer.push(null)
      transaction.commit();
      this.session.close();
    },
    onError: () => {
      buffer.push(null);
    },
  } );

return buffer;

}

...