когда мы фиксируем массовый запрос в janusgraph в OLAP и если есть какая-либо проблема вasticsearch, во время фиксации. Он не откатывает коммиты, сделанные Кассандрой, кто-нибудь знает, почему это так?
В janusgraph-core .jar внутренне этот метод вызывается в StandardJanusGraph.class -
public void commit(Collection<InternalRelation> addedRelations, Collection<InternalRelation> deletedRelations, StandardJanusGraphTx tx) {
if (!addedRelations.isEmpty() || !deletedRelations.isEmpty()) {
log.debug("Saving transaction. Added {}, removed {}", addedRelations.size(), deletedRelations.size());
if (!tx.getConfiguration().hasCommitTime()) {
tx.getConfiguration().setCommitTime(this.times.getTime());
}
Instant txTimestamp = tx.getConfiguration().getCommitTime();
long transactionId = this.txCounter.incrementAndGet();
if (!tx.getConfiguration().hasAssignIDsImmediately()) {
this.idAssigner.assignIDs(addedRelations);
}
BackendTransaction mutator = tx.getTxHandle();
boolean acquireLocks = tx.getConfiguration().hasAcquireLocks();
boolean hasTxIsolation = this.backend.getStoreFeatures().hasTxIsolation();
boolean logTransaction = this.config.hasLogTransactions() && !tx.getConfiguration().hasEnabledBatchLoading();
KCVSLog txLog = logTransaction ? this.backend.getSystemTxLog() : null;
TransactionLogHeader txLogHeader = new TransactionLogHeader(transactionId, txTimestamp, this.times);
try {
if (logTransaction) {
Preconditions.checkNotNull(txLog, "Transaction log is null");
txLog.add(txLogHeader.serializeModifications(this.serializer, LogTxStatus.PRECOMMIT, tx, addedRelations, deletedRelations), txLogHeader.getLogKey());
}
boolean hasSchemaElements = !Iterables.isEmpty(Iterables.filter(deletedRelations, SCHEMA_FILTER)) || !Iterables.isEmpty(Iterables.filter(addedRelations, SCHEMA_FILTER));
Preconditions.checkArgument(!hasSchemaElements || !tx.getConfiguration().hasEnabledBatchLoading() && acquireLocks, "Attempting to create schema elements in inconsistent state");
StandardJanusGraph.ModificationSummary commitSummary;
if (hasSchemaElements && !hasTxIsolation) {
BackendTransaction schemaMutator = this.openBackendTransaction(tx);
try {
commitSummary = this.prepareCommit(addedRelations, deletedRelations, SCHEMA_FILTER, schemaMutator, tx, acquireLocks);
assert commitSummary.hasModifications && !commitSummary.has2iModifications;
} catch (Throwable var42) {
schemaMutator.rollback();
throw var42;
}
try {
schemaMutator.commit();
} catch (Throwable var40) {
log.error("Could not commit transaction [" + transactionId + "] due to storage exception in system-commit", var40);
throw var40;
}
}
commitSummary = this.prepareCommit(addedRelations, deletedRelations, hasTxIsolation ? NO_FILTER : NO_SCHEMA_FILTER, mutator, tx, acquireLocks);
if (commitSummary.hasModifications) {
String logTxIdentifier = tx.getConfiguration().getLogIdentifier();
boolean hasSecondaryPersistence = logTxIdentifier != null || commitSummary.has2iModifications;
if (logTransaction) {
txLog.add(txLogHeader.serializePrimary(this.serializer, hasSecondaryPersistence ? LogTxStatus.PRIMARY_SUCCESS : LogTxStatus.COMPLETE_SUCCESS), txLogHeader.getLogKey(), mutator.getTxLogPersistor());
}
try {
mutator.commitStorage();
} catch (Throwable var39) {
log.error("Could not commit transaction [" + transactionId + "] due to storage exception in commit", var39);
throw var39;
}
if (hasSecondaryPersistence) {
LogTxStatus status = LogTxStatus.SECONDARY_SUCCESS;
Map<String, Throwable> indexFailures = ImmutableMap.of();
boolean userlogSuccess = true;
try {
indexFailures = mutator.commitIndexes();
if (!((Map)indexFailures).isEmpty()) {
status = LogTxStatus.SECONDARY_FAILURE;
Iterator var20 = ((Map)indexFailures).entrySet().iterator();
while(var20.hasNext()) {
java.util.Map.Entry<String, Throwable> entry = (java.util.Map.Entry)var20.next();
log.error("Error while committing index mutations for transaction [" + transactionId + "] on index: " + (String)entry.getKey(), (Throwable)entry.getValue());
}
}
if (logTxIdentifier != null) {
try {
userlogSuccess = false;
Log userLog = this.backend.getUserLog(logTxIdentifier);
Future<Message> env = userLog.add(txLogHeader.serializeModifications(this.serializer, LogTxStatus.USER_LOG, tx, addedRelations, deletedRelations));
if (env.isDone()) {
try {
env.get();
} catch (ExecutionException var37) {
throw var37.getCause();
}
}
userlogSuccess = true;
} catch (Throwable var38) {
status = LogTxStatus.SECONDARY_FAILURE;
log.error("Could not user-log committed transaction [" + transactionId + "] to " + logTxIdentifier, var38);
}
}
} finally {
if (logTransaction) {
try {
txLog.add(txLogHeader.serializeSecondary(this.serializer, status, (Map)indexFailures, userlogSuccess), txLogHeader.getLogKey());
} catch (Throwable var36) {
log.error("Could not tx-log secondary persistence status on transaction [" + transactionId + "]", var36);
}
}
}
} else {
mutator.commitIndexes();
}
} else {
mutator.commit();
}
} catch (Throwable var43) {
log.error("Could not commit transaction [" + transactionId + "] due to exception", var43);
try {
mutator.rollback();
} catch (Throwable var35) {
log.error("Could not roll-back transaction [" + transactionId + "] after failure due to exception", var35);
}
if (var43 instanceof RuntimeException) {
throw (RuntimeException)var43;
} else {
throw new JanusGraphException("Unexpected exception", var43);
}
}
}
}
любая идея, почему они не выдают исключение для вторичного постоянства.