Используя apo c .periodi c .commit для вставки бесконечного потока json в neo4j - PullRequest
0 голосов
/ 06 января 2020

Я новичок в NEO4J и пытаюсь вставить данные из потока JSON в базу данных. Элемент root потока JSON является массивом, каждый элемент массива является объектом, который содержит ключ / значение и массив.

образец потока JSON:

[
{
 "access_point":4864834, 
 "objects": [ 
  {"class_id":10, "name":"iphone", "snr":0.557461}, 
  {"class_id":7, "name":"android", "snr":0.822390}, 
  {"class_id":7, "name":"android", "snr":0.320850}, 
  {"class_id":2, "name":"pc", "snr":0.915604}
 ] 
}, 
{
 "access_point":4864835, 
 "objects": [ 
  {"class_id":12, "name":"iphone", "snr":0.268736}, 
  {"class_id":10, "name":"android", "snr":0.585927}, 
  {"class_id":7, "name":"android", "snr":0.821383}, 
  {"class_id":2, "name":"pc", "snr":0.254997}, 
  {"class_id":7, "name":"android", "snr":0.326559}, 
  {"class_id":2, "name":"pc", "snr":0.905473}
 ] 
}, 

Поскольку это бесконечный поток, мне нужно выполнить пакетные коммиты, так как apo c .load. json никогда не достигнет конца массива.

пока мой запрос выглядит так:

CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});

Это, конечно, не ссылка на поток JSON, а stati c JSON в моем github.

Есть ли способ сказать это сохранить после n элементов в массиве?

Ответы [ 2 ]

2 голосов
/ 06 января 2020

Похоже, вы должны использовать апо c .periodi c .iterate вместо apoc.periodic.commit. Например:

CALL apoc.periodic.iterate(
  "CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
  "MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
   FOREACH(obj IN ap.objects |
     MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});

apoc.periodic.iterate задокументировано для поддержки опции batchSize, которая обрабатывает N выполнений второго оператора Cypher в одной транзакции.

0 голосов
/ 09 января 2020

Поскольку у меня есть доступ к источнику данных, мы смогли изменить способ вывода JSON. Мы переключили его на JSONL (линия обозначена JSON), где каждая строка JSON по сути рассматривается как собственный документ JSON. Я использовал много ответов @cybersam, а также Майкла Хангера, поэтому спасибо.

изменил источник JSON на JSONL следующим образом:


{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}

и мой neo4j cypher запрос выглядел следующим образом:

CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
  MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...