Как читать с Кассандры с помощью Apache Flink? - PullRequest
0 голосов
/ 05 июня 2018

Моя программа flink должна выполнить поиск Cassandra для каждой входной записи и, основываясь на результатах, должна выполнить некоторую дальнейшую обработку.

Но в настоящее время я застреваю при чтении данных из Cassandra.Это фрагмент кода, который я придумал до сих пор.

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                    .build();
        }
    };

    for (int i=1; i<5; i++) {
        CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
                new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
        cassandraInputFormat.configure(null);
        cassandraInputFormat.open(null);
        Tuple2<String, String> out = new Tuple8<>();
        cassandraInputFormat.nextRecord(out);
        System.out.println(out);
    }

Но проблема в том, что для каждого просмотра требуется около 10 секунд, другими словами, этот цикл for занимает50 секунд на выполнение.

Как ускорить эту операцию?В качестве альтернативы, есть ли другой способ поиска Кассандры во Флинке?

1 Ответ

0 голосов
/ 06 июня 2018

Я нашел решение, которое довольно быстро запрашивает у Cassandra потоковые данные.Было бы полезно для кого-то с той же проблемой.

Во-первых, Cassandra может быть запрошен с таким небольшим количеством кода, как,

Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");

Но проблема с этим заключается в создании Sessionочень дорогостоящая операция и то, что должно быть выполнено один раз для каждого ключевого пространства.Вы создаете Session один раз и повторно используете его для всех запросов на чтение.

Теперь, поскольку Session не является сериализуемым в Java, его нельзя передать в качестве аргумента операторам Flink, таким как Map или ProcessFunction.Есть несколько способов решения этой проблемы: вы можете использовать RichFunction и инициализировать его в методе Open или использовать Singleton.Я буду использовать второе решение.

Создайте класс Singleton следующим образом, где мы создадим Session.

public class CassandraSessionSingleton {
    private static CassandraSessionSingleton cassandraSessionSingleton = null;

    public Session session;

    private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
        Cluster cluster = clusterBuilder.getCluster();
        session = cluster.connect();
    }

    public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
        if (cassandraSessionSingleton == null)
            cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
        return cassandraSessionSingleton;
    }

}

Затем вы сможете использовать этот сеанс для всех будущих запросов.Здесь я использую ProcessFunction для выполнения запросов в качестве примера.

public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
    ClusterBuilder secureCassandraSinkClusterBuilder;

    // Constructor
    public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
        this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
    }

    @Override
    public void  ProcessElement (Object obj) throws Exception {
        ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
        return resultSet;
    }
}

Обратите внимание, что вы можете передать ClusterBuilder в ProcessFunction, поскольку он сериализуем.Теперь для метода cassandraLookUp, в котором мы выполняем запрос.

public class CassandraLookUp {
    public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
        CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
        Session session = cassandraSessionSingleton.session;
        ResultSet resultSet = session.execute(query);
        return resultSet;
    }
}

Объект singleton создается только при первом запуске запроса, после этого тот же объект используется повторно, поэтому задержка отсутствуетв поиске.

...