Ошибка тайм-аута Neo4j / Py2Neo при импорте больших файлов CSV - PullRequest
0 голосов
/ 31 мая 2019

При импорте данных из больших CSV-файлов (> 200 МБ) в Neo4j ответ заканчивается зависанием. Запрос завершается , и все записи импортируются, однако, по-видимому, имеется какое-то время ожидания ответа, которое не приводит к указанию того, что запрос на импорт завершен.Это проблема, поскольку мы не можем автоматизировать импорт нескольких файлов в Neo4j, поскольку скрипт продолжает ожидать завершения запроса, даже если он уже есть.

Импорт 1 файла занимает около 10-15 минут.

Никаких ошибок нигде в конвейере не выдается, все просто зависает.Я могу только сказать, когда процесс завершился, когда активность ЦП виртуальной машины прекратилась.

Этот процесс работает с небольшими файлами и отправляет подтверждение после завершения работы предыдущего файла.импортируется и переходит к следующему.

Я попытался запустить сценарии как из записной книжки Jupyter, так и из сценария Python непосредственно на консоли.Я также даже пытался запустить запрос непосредственно на Neo4j через консоль браузера.Каждый способ приводит к зависанию запросов, поэтому я не уверен, что проблема связана с Neo4j или Py2Neo.

Пример запроса:

USING PERIODIC COMMIT 1000
LOAD CSV FROM {csvfile}  AS line
MERGE (:Author { authorid: line[0], name: line[1] } )

Модифицированный скрипт Python с использованием Py2Neo:

from azure.storage.blob import BlockBlobService
blob_service = BlockBlobService(account_name="<name>",account_key="<key>")
generator = blob_service.list_blobs("parsed-csv-files")

for blob in generator:
    print(blob.name)
    csv_file_base = "http://<base_uri>/parsed-csv-files/"
    csvfile = csv_file_base + blob.name
    params = { "csvfile":csvfile }
    mygraph.run(query, parameters=params )

Neo4j debug.log, похоже, не записывает никаких ошибок.

Пример debug.log:

2019-05-30 05:44:32.022+0000 INFO [o.n.k.i.i.s.GenericNativeIndexProvider] Schema index cleanup job finished: descriptor=IndexRule[id=16, descriptor=Index( UNIQUE, :label[5](property[5]) ), provider={key=native-btree, version=1.0}, owner=42], indexFile=/data/databases/graph.db/schema/index/native-btree-1.0/16/index-16 Number of pages visited: 598507, Number of cleaned crashed pointers: 0, Time spent: 2m 25s 235ms
2019-05-30 05:44:32.071+0000 INFO [o.n.k.i.i.s.GenericNativeIndexProvider] Schema index cleanup job closed: descriptor=IndexRule[id=16, descriptor=Index( UNIQUE, :label[5](property[5]) ), provider={key=native-btree, version=1.0}, owner=42], indexFile=/data/databases/graph.db/schema/index/native-btree-1.0/16/index-16
2019-05-30 05:44:32.071+0000 INFO [o.n.k.i.i.s.GenericNativeIndexProvider] Schema index cleanup job started: descriptor=IndexRule[id=19, descriptor=Index( UNIQUE, :label[6](property[6]) ), provider={key=native-btree, version=1.0}, owner=46], indexFile=/data/databases/graph.db/schema/index/native-btree-1.0/19/index-19
2019-05-30 05:44:57.126+0000 INFO [o.n.k.i.i.s.GenericNativeIndexProvider] Schema index cleanup job finished: descriptor=IndexRule[id=19, descriptor=Index( UNIQUE, :label[6](property[6]) ), provider={key=native-btree, version=1.0}, owner=46], indexFile=/data/databases/graph.db/schema/index/native-btree-1.0/19/index-19 Number of pages visited: 96042, Number of cleaned crashed pointers: 0, Time spent: 25s 55ms
2019-05-30 05:44:57.127+0000 INFO [o.n.k.i.i.s.GenericNativeIndexProvider] Schema index cleanup job closed: descriptor=IndexRule[id=19, descriptor=Index( UNIQUE, :label[6](property[6]) ), provider={key=native-btree, version=1.0}, owner=46], indexFile=/data/databases/graph.db/schema/index/native-btree-1.0/19/index-19

РЕДАКТИРОВАТЬ: использовал более простой запрос, которыйвсе еще дает ту же проблему

1 Ответ

0 голосов
/ 02 июня 2019

Поскольку выполнение запроса на стороне БД может занять много времени, возможно, у py2neo возникли проблемы с ожиданием.

Не должно быть проблем с периодической фиксацией.

Вы пробовали драйвер Python neo4j и читали csv из python и выполняли запрос таким образом?

Вот пример кода с драйвером neo4j.

import pandas as pd
from neo4j import GraphDatabase

driver = GraphDatabase.driver(serveruri, auth=(user,pwd))
with driver.session() as session:
    file = config['spins_file']
    row_chunks = pd.read_csv(file, sep=',', error_bad_lines=False,
                       index_col=False,
                       low_memory=False,
                       chunksize=config['chunk_size'])
    for i, rows in enumerate(row_chunks):
        print("Chunk {}".format(i))
        rows_dict = {'rows': rows.fillna(value="").to_dict('records')}
        session.run(statement="""
                    unwind data.rows as row
                    MERGE (:Author { authorid: line[0], name: line[1] } )
                    """,
                    dict=rows_dict)
...