У нас есть сценарий использования, когда нам нужно иметь два отдельных объекта PreparedStatement (один с RetryPolicy, а другой без какой-либо политики повтора) для одной и той же строки запроса. И когда мы пытаемся создать два отдельных PreparedStatements, мы видим, что сохраняется только один PreparedStatement (предыдущий PreparedStatement переопределяется последним PreparedStatement). Мы рассмотрели реализацию кода и увидели, что метод Cluster.addPrepared
ищет какой-то MD5 га sh для запроса, и он переопределяет старый PreparedStatement с новым PreparedStatement.
Чтобы преодолеть это поведение, мы создание двух отдельных запросов (второй запрос имеет дополнительный пробел в конце по сравнению с первым запросом), чтобы различать их guish, чтобы в итоге мы получили два подготовленных оператора.
Например: Первый запрос: select * from test
, Второй запрос: "select * from test "
Есть ли лучший способ создания двух отдельных PreparedStatements для той же строки запроса, как того требует наш вариант использования? Для нас это выглядит подверженным ошибкам.
Мы используем cassandra-drive-core-3.1.0-shaded.jar
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.LoggingRetryPolicy;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Stream;
public class CassandraHelper {
private static final Logger log = LoggerFactory.getLogger(CassandraHelper.class);
private final String hostname;
private final int port;
private Cluster cluster;
private Session session;
private static final String selectStackoverflowCql =
"SELECT key_part_one, key_part_two, data FROM test_keyspace.stackoverflow_composite WHERE key_part_one=? AND key_part_two=?";
private static final String selectStackoverflowCqlWithSpace =
"SELECT key_part_one, key_part_two, data FROM test_keyspace.stackoverflow_composite WHERE key_part_one=? AND key_part_two=? ";
private PreparedStatement selectStackoverflowWithoutRetryPolicyStatement;
private PreparedStatement selectStackoverflowStatement;
private PreparedStatement selectStackoverflowStatementWithSpace;
public CassandraHelper(String host, int port) {
this.hostname = host;
this.port = port;
connect();
install_schema();
selectStackoverflowStatement = session.prepare(selectStackoverflowCql)
.setRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
selectStackoverflowWithoutRetryPolicyStatement = session.prepare(selectStackoverflowCql);
selectStackoverflowStatementWithSpace = session.prepare(selectStackoverflowCqlWithSpace)
.setRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
}
public void connect() {
int maxRetries = 2;
for (int numReties = 0; numReties < maxRetries; numReties += 1) {
log.info("Connecting (attempt {})...", numReties);
try {
cluster = buildCluster();
session = cluster.connect();
log.info("Created session");
ResultSet rs = session.execute("select release_version from system.local");
Row row = rs.one();
log.info("Version: {}", row.getString("release_version"));
return;
} catch (NoHostAvailableException ex) { // if we get this exception, wait for Cassandra to become available
log.info("Got NoHostAvailableException");
}
}
throw new RuntimeException("Failed to connect to Cassandra cluster");
}
private Cluster buildCluster() {
if (this.cluster == null || this.cluster.isClosed()) {
this.cluster = Cluster.builder()
.withMaxSchemaAgreementWaitSeconds(20)
.addContactPointsWithPorts(Lists.newArrayList(new InetSocketAddress(this.hostname, this.port)))
.withoutJMXReporting().build();
this.cluster.init();
log.info("Built cluster");
cluster.getConfiguration().getQueryOptions().setDefaultIdempotence(true);
}
return cluster;
}
public PreparedStatement getSelectStackoverflowStatementWithSpace() {
return selectStackoverflowStatementWithSpace;
}
public PreparedStatement getSelectStackoverflowWithoutRetryPolicyStatement() {
return selectStackoverflowWithoutRetryPolicyStatement;
}
public PreparedStatement getSelectStackoverflowStatement() {
return selectStackoverflowStatement;
}
private void install_schema() {
List<String> creationCqls = Lists.newArrayList(
"CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': 1 }",
"CREATE TABLE IF NOT EXISTS test_keyspace.stackoverflow_composite (key_part_one text, key_part_two int, data text, PRIMARY KEY(key_part_one, key_part_two))");
Stream<String> stream = creationCqls.stream();
stream.forEach(cql -> {
if (!executeSchemaChange(cql)) {
log.info("Schema agreement timeout attempting to execute {}. (This is not a problem as we will wait for schema agreement in the verification step.", cql);
}
});
}
public boolean executeSchemaChange(String cql) {
ResultSet rs = session.execute(cql);
return rs.getExecutionInfo().isSchemaInAgreement();
}
public static void main(String[] args) {
CassandraHelper ch = new CassandraHelper("localhost", 32769);
assert (ch.getSelectStackoverflowWithoutRetryPolicyStatement() == ch.getSelectStackoverflowStatement());
assert (ch.getSelectStackoverflowStatementWithSpace() != ch.getSelectStackoverflowStatement());
System.out.println("Statement without Retry Statement: " + ch.getSelectStackoverflowWithoutRetryPolicyStatement());
System.out.println("Statement with Retry and query not having space: " + ch.getSelectStackoverflowStatement());
System.out.println("Statement with Retry and query with space: " + ch.getSelectStackoverflowStatementWithSpace());
}
}