apoc.periodic.iterate динамический список процессов с использованием Spring / Java - PullRequest
0 голосов
/ 02 февраля 2019
  1. Я хочу передать список, который необходимо повторить, и создать из него узлы и отношения.

  2. Я сейчас использую Apoc.periodic.iterateпотому что ранее я пытался с размоткой, которая привела к тупику из-за параллельного выполнения.

  3. Помогите мне исправить текущий раздел кода.

  4. Яобработка более 900 тыс. данных кусками.Который я прочитаю из тем Кафки.

    Текущий раздел кода: @Query("CALL apoc.periodic.iterate("+"\"" + " UNWIND $items as item return item\" ," + "\"MERGE (src:Item {name: item.name}) " + " ON CREATE SET src.name = item.name, " + " src.updateDate = {processDate} " + "MERGE (dest:ItemDet {upc: item.upc}) " + " ON CREATE SET dest.upc = item.upc, " + " dest.creationDate = {processDate} " )\", {batchSize:1000, parallel:false,iterateList:true,params:{items:{items}}})") Result mergeItems(@Param("items") List<?> items, @Param("processDate") String processDate);

    Предыдущий раздел кода: @Query("UNWIND {items} as item MERGE(src:Item {name: item.name}) " + " ON CREATE SET src.name = item.name, " + " src.updateDate = {processDate} " + "MERGE (dest:ItemDet {upc: item.upc}) " + " ON CREATE SET dest.upc = item.upc, " + " dest.creationDate = {processDate} ") Result mergeItems(@Param("items") List<?> items, @Param("processDate") String processDate);

ЖУРНАЛ ОШИБКИ

ERROR Unexpected error detected in bolt session '8c8590fffeb2c45f-000098a4-00000004-54a89d3a07b39611-ca025a5c'. Failed to process a bolt message
org.neo4j.bolt.v1.runtime.BoltConnectionFatality: Failed to process a bolt message
    at org.neo4j.bolt.v1.runtime.BoltStateMachine.handleFailure(BoltStateMachine.java:742)
    at org.neo4j.bolt.v1.runtime.BoltStateMachine.handleFailure(BoltStateMachine.java:728)
    at org.neo4j.bolt.v1.runtime.BoltStateMachine.access$500(BoltStateMachine.java:62)
    at org.neo4j.bolt.v1.runtime.BoltStateMachine$State$1.init(BoltStateMachine.java:435)
    at org.neo4j.bolt.v1.runtime.BoltStateMachine.init(BoltStateMachine.java:145)
    at org.neo4j.bolt.v1.messaging.BoltMessageRouter.lambda$onInit$0(BoltMessageRouter.java:70)
    at org.neo4j.bolt.runtime.DefaultBoltConnection.processNextBatch(DefaultBoltConnection.java:195)
    at org.neo4j.bolt.runtime.DefaultBoltConnection.processNextBatch(DefaultBoltConnection.java:143)
    at org.neo4j.bolt.runtime.ExecutorBoltScheduler.executeBatch(ExecutorBoltScheduler.java:170)
    at org.neo4j.bolt.runtime.ExecutorBoltScheduler.lambda$scheduleBatchOrHandleError$2(ExecutorBoltScheduler.java:153)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.neo4j.bolt.security.auth.AuthenticationException: The client is unauthorized due to authentication failure.
    at org.neo4j.bolt.security.auth.BasicAuthentication.doAuthenticate(BasicAuthentication.java:78)
    at org.neo4j.bolt.security.auth.BasicAuthentication.authenticate(BasicAuthentication.java:60)
    at org.neo4j.bolt.v1.runtime.BoltStateMachineSPI.authenticate(BoltStateMachineSPI.java:93)
    at org.neo4j.bolt.v1.runtime.BoltStateMachine$State$1.init(BoltStateMachine.java:412)
    ... 10 more
2019-02-02 02:31:53.511+0000 INFO  starting batching from `UNWIND $ingredientUpcModels as node return node` operation using iteration `MERGE (src:Token {name: node.tokenKey})  ON CREATE SET src.name = node.tokenKey, src.creationDate = {processDate},  src.updateDate = {processDate} MERGE (dest:Item {divUpc: node.divUpc})  ON CREATE SET dest.upc = node.upc,    dest.divNumber = node.division,    dest.divUpc = node.divUpc,    dest.creationDate = {processDate} MERGE (src)-[r:TOKEN_ITEM]->(dest)  ON CREATE SET r.creationDate = {processDate}  SET r.score = node.score, r.updateDate = {processDate} )` in separate thread
2019-02-02 02:31:53.544+0000 WARN  Error during iterate.commit:
2019-02-02 02:31:53.544+0000 WARN  1 times: org.neo4j.graphdb.TransactionFailureException: Transaction was marked as successful, but unable to commit transaction so rolled back.
2019-02-02 02:31:53.545+0000 WARN  Error during iterate.execute:
2019-02-02 02:31:53.545+0000 WARN  1 times: Invalid input ')': expected whitespace, comment, '.', node labels, '[', "=~", IN, STARTS, ENDS, CONTAINS, IS, '^', '*', '/', '%', '+', '-', '=', "<>", "!=", '<', '>', "<=", ">=", AND, XOR, OR, ',', LOAD CSV, FROM, INTO, START, MATCH, UNWIND, MERGE, CREATE GRAPH >>, CREATE >> GRAPH, CREATE GRAPH, CREATE, SET, DELETE GRAPHS, DELETE, REMOVE, FOREACH, WITH, CALL, PERSIST, RELOCATE, RETURN, SNAPSHOT, UNION, ';' or end of input (line 1, column 515 (offset: 514))
"UNWIND {_batch} AS _batch WITH _batch.node AS node  MERGE (src:Token {name: node.tokenKey})  ON CREATE SET src.name = node.tokenKey, src.creationDate = {processDate},  src.updateDate = {processDate} MERGE (dest:Item {divUpc: node.divUpc})  ON CREATE SET dest.upc = node.upc,    dest.divNumber = node.division,    dest.divUpc = node.divUpc,    dest.creationDate = {processDate} MERGE (src)-[r:TOKEN_ITEM]->(dest)  ON CREATE SET r.creationDate = {processDate}  SET r.score = node.score, r.updateDate = {processDate} )"
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   ^
2019-02-02 02:31:53.741+0000 INFO  starting batching from `UNWIND $ingredientUpcModels as node return node` operation using iteration `MERGE (src:Token {name: node.tokenKey})  ON CREATE SET src.name = node.tokenKey, src.creationDate = {processDate},  src.updateDate = {processDate} MERGE (dest:Item {divUpc: node.divUpc})  ON CREATE SET dest.upc = node.upc,    dest.divNumber = node.division,    dest.divUpc = node.divUpc,    dest.creationDate = {processDate} MERGE (src)-[r:TOKEN_ITEM]->(dest)  ON CREATE SET r.creationDate = {processDate}  SET r.score = node.score, r.updateDate = {processDate} )` in separate thread
2019-02-02 02:31:53.752+0000 WARN  Error during iterate.commit:
2019-02-02 02:31:53.752+0000 WARN  1 times: org.neo4j.graphdb.TransactionFailureException: Transaction was marked as successful, but unable to commit transaction so rolled back.
2019-02-02 02:31:53.752+0000 WARN  Error during iterate.execute:
2019-02-02 02:31:53.752+0000 WARN  1 times: Invalid input ')': expected whitespace, comment, '.', node labels, '[', "=~", IN, STARTS, ENDS, CONTAINS, IS, '^', '*', '/', '%', '+', '-', '=', "<>", "!=", '<', '>', "<=", ">=", AND, XOR, OR, ',', LOAD CSV, FROM, INTO, START, MATCH, UNWIND, MERGE, CREATE GRAPH >>, CREATE >> GRAPH, CREATE GRAPH, CREATE, SET, DELETE GRAPHS, DELETE, REMOVE, FOREACH, WITH, CALL, PERSIST, RELOCATE, RETURN, SNAPSHOT, UNION, ';' or end of input (line 1, column 515 (offset: 514))
"UNWIND {_batch} AS _batch WITH _batch.node AS node  MERGE (src:Token {name: node.tokenKey})  ON CREATE SET src.name = node.tokenKey, src.creationDate = {processDate},  src.updateDate = {processDate} MERGE (dest:Item {divUpc: node.divUpc})  ON CREATE SET dest.upc = node.upc,    dest.divNumber = node.division,    dest.divUpc = node.divUpc,    dest.creationDate = {processDate} MERGE (src)-[r:TOKEN_ITEM]->(dest)  ON CREATE SET r.creationDate = {processDate}  SET r.score = node.score, r.updateDate = {processDate} )"

1 Ответ

0 голосов
/ 03 февраля 2019
1) Its due to syntax error .I have found the fix.

 @Query("CALL apoc.periodic.iterate("+"\""
    + " UNWIND $items as item return item\" ,"
    + "\"MERGE (src:Item {name: item.name}) "
    + " ON CREATE SET src.name = item.name, "
    + " src.updateDate = {processDate} "
    + "MERGE (dest:ItemDet {upc: item.upc}) "
    + " ON CREATE SET dest.upc = item.upc, "
    + "   dest.creationDate = {processDate} "\", 
    {batchSize:1000, parallel:false,iterateList:true,params:{items:{items},{processDate:{processDate}}}})")
     Result mergeItems(@Param("items") List<?> items,
       @Param("processDate") String processDate);
 
...