JanusGraph выбрасывает InterruptedException в CompletableFuture для KeyValueStore - PullRequest
0 голосов
/ 09 января 2020

language = Java 8

Я использую FoundationDB в качестве адаптера janusgraph. При выполнении RangeReads я хочу получить весь результат в виде списка. asList () метод AsyncIterable возвращает CompletableFuture, на котором я получаю получить результат. В большинстве случаев это работает, но в производственной среде get () завершается с ошибкой около 1% запроса с InterruptedExcpetion. Если я повторяю n раз, я получаю правильный результат.

result = transaction.getRange(new Range(startKey, endKey), limit).asList().get();

У меня есть несколько вопросов

  • Почему при выполнении прерывания возникает исключение get () для completetableFuture?
  • Как я могу минимизировать возникновение исключения прерывания?
  • Может ли JanusGraph прервать completetableFuture?
  • может ли FoundationDb генерировать исключение interruptException в своем getRange

Я использую адаптер basedb для janusgraph githubLink

трассировка стека исключений

java.lang.InterruptedException
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.experoinc.janusgraph.diskstorage.foundationdb.FoundationDBTx.getMultiRange(FoundationDBTx.java:241)
at com.experoinc.janusgraph.diskstorage.foundationdb.FoundationDBKeyValueStore.getSlices(FoundationDBKeyValueStore.java:191)
at org.janusgraph.diskstorage.keycolumnvalue.keyvalue.OrderedKeyValueStoreAdapter.getSlice(OrderedKeyValueStoreAdapter.java:78)
at org.janusgraph.diskstorage.keycolumnvalue.KCVSProxy.getSlice(KCVSProxy.java:81)
at org.janusgraph.diskstorage.BackendTransaction$2.call(BackendTransaction.java:287)
at org.janusgraph.diskstorage.BackendTransaction$2.call(BackendTransaction.java:284)
at org.janusgraph.diskstorage.util.BackendOperation.executeDirect(BackendOperation.java:68)
at org.janusgraph.diskstorage.util.BackendOperation.execute(BackendOperation.java:54)
at org.janusgraph.diskstorage.BackendTransaction.executeRead(BackendTransaction.java:469)
at org.janusgraph.diskstorage.BackendTransaction.edgeStoreMultiQuery(BackendTransaction.java:284)
at org.janusgraph.graphdb.database.StandardJanusGraph.edgeMultiQuery(StandardJanusGraph.java:450)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.lambda$executeMultiQuery$5(StandardJanusGraphTx.java:1108)
at org.janusgraph.graphdb.query.profile.QueryProfiler.profile(QueryProfiler.java:99)
at org.janusgraph.graphdb.query.profile.QueryProfiler.profile(QueryProfiler.java:91)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.executeMultiQuery(StandardJanusGraphTx.java:1108)
at org.janusgraph.graphdb.query.vertex.MultiVertexCentricQueryBuilder.execute(MultiVertexCentricQueryBuilder.java:113)
at org.janusgraph.graphdb.query.vertex.MultiVertexCentricQueryBuilder.edges(MultiVertexCentricQueryBuilder.java:133)
at org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphVertexStep.initializeMultiQuery(JanusGraphVertexStep.java:138)
at org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphVertexStep.initialize(JanusGraphVertexStep.java:116)
at org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphVertexStep.processNextStart(JanusGraphVertexStep.java:181)
at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143)
at org.apache.tinkerpop.gremlin.process.traversal.step.util.ExpandableStepIterator.next(ExpandableStepIterator.java:50)
at org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep.processNextStart(FilterStep.java:37)
at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143)
at org.apache.tinkerpop.gremlin.process.traversal.step.util.ExpandableStepIterator.next(ExpandableStepIterator.java:50)
at org.apache.tinkerpop.gremlin.process.traversal.step.map.MapStep.processNextStart(MapStep.java:36)
at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143)
at org.apache.tinkerpop.gremlin.process.traversal.step.util.ExpandableStepIterator.hasNext(ExpandableStepIterator.java:42)
at org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphPropertiesStep.initialize(JanusGraphPropertiesStep.java:89)
at org.janusgraph.graphdb.tinkerpop.optimize.JanusGraphPropertiesStep.processNextStart(JanusGraphPropertiesStep.java:124)
at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143)
at org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal.hasNext(DefaultTraversal.java:192)
at org.apache.tinkerpop.gremlin.server.util.TraverserIterator.fillBulker(TraverserIterator.java:69)
at org.apache.tinkerpop.gremlin.server.util.TraverserIterator.hasNext(TraverserIterator.java:56)
at org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor.handleIterator(TraversalOpProcessor.java:483)
at org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor.lambda$iterateBytecodeTraversal$4(TraversalOpProcessor.java:382)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

код getMultiRange

public synchronized  Map<KVQuery, List<KeyValue>> getMultiRange(final List<Object[]> queries)
        throws PermanentBackendException {
    Map<KVQuery, List<KeyValue>> resultMap = new ConcurrentHashMap<>();
    final List<Object[]> retries = new CopyOnWriteArrayList<>(queries);
    final List<CompletableFuture> futures = new LinkedList<>();
    for (int i = 0; i < (maxRuns * 5); i++) {
        for(Object[] obj : retries) {
            final KVQuery query = (KVQuery) obj[0];
            final byte[] start = (byte[]) obj[1];
            final byte[] end = (byte[]) obj[2];

            final int startTxId = txCtr.get();
            try {
                futures.add(tx.getRange(start, end, query.getLimit()).asList()
                        .whenComplete((res, th) -> {
                            if (th == null) {
                                retries.remove(query);
                                if (res == null) {
                                    res = Collections.emptyList();
                                }
                                resultMap.put(query, res);
                            } else {
                                if (startTxId == txCtr.get())
                                    this.restart();
                            }
                        }));
            } catch (IllegalStateException fdbe) {
                // retry on IllegalStateException thrown when tx state changes prior to getRange call
            }
        }
    }
    for (final CompletableFuture future : futures) {
        try {
            future.get();
        } catch (ExecutionException ee) {
            // some tasks will fail due to tx time limits being exceeded
        } catch (IllegalStateException is) {
            // illegal state can arise from tx being closed while tx is inflight
        } catch (Exception e) {
            log.error("failed to get multi range for queries {}", queries, e);
            throw new PermanentBackendException(e);
        }
    }

    return resultMap;
}
...