Как получить два объекта PreparedStatement для одной строки запроса - PullRequest
0 голосов
/ 13 февраля 2020

У нас есть сценарий использования, когда нам нужно иметь два отдельных объекта PreparedStatement (один с RetryPolicy, а другой без какой-либо политики повтора) для одной и той же строки запроса. И когда мы пытаемся создать два отдельных PreparedStatements, мы видим, что сохраняется только один PreparedStatement (предыдущий PreparedStatement переопределяется последним PreparedStatement). Мы рассмотрели реализацию кода и увидели, что метод Cluster.addPrepared ищет какой-то MD5 га sh для запроса, и он переопределяет старый PreparedStatement с новым PreparedStatement.

Чтобы преодолеть это поведение, мы создание двух отдельных запросов (второй запрос имеет дополнительный пробел в конце по сравнению с первым запросом), чтобы различать их guish, чтобы в итоге мы получили два подготовленных оператора.

Например: Первый запрос: select * from test, Второй запрос: "select * from test "

Есть ли лучший способ создания двух отдельных PreparedStatements для той же строки запроса, как того требует наш вариант использования? Для нас это выглядит подверженным ошибкам.

Мы используем cassandra-drive-core-3.1.0-shaded.jar

import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.LoggingRetryPolicy;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Stream;

public class CassandraHelper {

    private static final Logger log = LoggerFactory.getLogger(CassandraHelper.class);
    private final String hostname;
    private final int port;
    private Cluster cluster;
    private Session session;

    private static final String selectStackoverflowCql =
            "SELECT key_part_one, key_part_two, data FROM test_keyspace.stackoverflow_composite WHERE key_part_one=? AND key_part_two=?";

    private static final String selectStackoverflowCqlWithSpace =
            "SELECT key_part_one, key_part_two, data FROM test_keyspace.stackoverflow_composite WHERE key_part_one=? AND key_part_two=? ";

    private PreparedStatement selectStackoverflowWithoutRetryPolicyStatement;
    private PreparedStatement selectStackoverflowStatement;
    private PreparedStatement selectStackoverflowStatementWithSpace;

    public CassandraHelper(String host, int port) {
        this.hostname = host;
        this.port = port;
        connect();
        install_schema();
        selectStackoverflowStatement = session.prepare(selectStackoverflowCql)
                .setRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
        selectStackoverflowWithoutRetryPolicyStatement = session.prepare(selectStackoverflowCql);
        selectStackoverflowStatementWithSpace = session.prepare(selectStackoverflowCqlWithSpace)
                .setRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
    }

    public void connect() {
        int maxRetries = 2;
        for (int numReties = 0; numReties < maxRetries; numReties += 1) {
            log.info("Connecting (attempt {})...", numReties);
            try {
                cluster = buildCluster();
                session = cluster.connect();
                log.info("Created session");
                ResultSet rs = session.execute("select release_version from system.local");
                Row row = rs.one();
                log.info("Version: {}", row.getString("release_version"));
                return;
            } catch (NoHostAvailableException ex) { // if we get this exception, wait for Cassandra to become available
                log.info("Got NoHostAvailableException");
            }
        }
        throw new RuntimeException("Failed to connect to Cassandra cluster");
    }

    private Cluster buildCluster() {

        if (this.cluster == null || this.cluster.isClosed()) {
            this.cluster = Cluster.builder()
                    .withMaxSchemaAgreementWaitSeconds(20)
                    .addContactPointsWithPorts(Lists.newArrayList(new InetSocketAddress(this.hostname, this.port)))
                    .withoutJMXReporting().build();
            this.cluster.init();
            log.info("Built cluster");
            cluster.getConfiguration().getQueryOptions().setDefaultIdempotence(true);
        }
        return cluster;
    }

    public PreparedStatement getSelectStackoverflowStatementWithSpace() {
        return selectStackoverflowStatementWithSpace;
    }

    public PreparedStatement getSelectStackoverflowWithoutRetryPolicyStatement() {
        return selectStackoverflowWithoutRetryPolicyStatement;
    }

    public PreparedStatement getSelectStackoverflowStatement() {
        return selectStackoverflowStatement;
    }


    private void install_schema() {
        List<String> creationCqls = Lists.newArrayList(
                "CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': 1 }",
                "CREATE TABLE IF NOT EXISTS test_keyspace.stackoverflow_composite (key_part_one text, key_part_two int, data text, PRIMARY KEY(key_part_one, key_part_two))");
        Stream<String> stream = creationCqls.stream();
        stream.forEach(cql -> {
            if (!executeSchemaChange(cql)) {
                log.info("Schema agreement timeout attempting to execute {}. (This is not a problem as we will wait for schema agreement in the verification step.", cql);
            }
        });
    }

    public boolean executeSchemaChange(String cql) {
        ResultSet rs = session.execute(cql);
        return rs.getExecutionInfo().isSchemaInAgreement();
    }

    public static void main(String[] args) {
        CassandraHelper ch = new CassandraHelper("localhost", 32769);
        assert (ch.getSelectStackoverflowWithoutRetryPolicyStatement() == ch.getSelectStackoverflowStatement());
        assert (ch.getSelectStackoverflowStatementWithSpace() != ch.getSelectStackoverflowStatement());
        System.out.println("Statement without Retry Statement: " + ch.getSelectStackoverflowWithoutRetryPolicyStatement());
        System.out.println("Statement with Retry and query not having space: " + ch.getSelectStackoverflowStatement());
        System.out.println("Statement with Retry and query with space: " + ch.getSelectStackoverflowStatementWithSpace());
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...