Я нашел решение, которое довольно быстро запрашивает у Cassandra потоковые данные.Было бы полезно для кого-то с той же проблемой.
Во-первых, Cassandra может быть запрошен с таким небольшим количеством кода, как,
Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");
Но проблема с этим заключается в создании Session
очень дорогостоящая операция и то, что должно быть выполнено один раз для каждого ключевого пространства.Вы создаете Session
один раз и повторно используете его для всех запросов на чтение.
Теперь, поскольку Session
не является сериализуемым в Java, его нельзя передать в качестве аргумента операторам Flink, таким как Map
или ProcessFunction
.Есть несколько способов решения этой проблемы: вы можете использовать RichFunction и инициализировать его в методе Open
или использовать Singleton.Я буду использовать второе решение.
Создайте класс Singleton следующим образом, где мы создадим Session
.
public class CassandraSessionSingleton {
private static CassandraSessionSingleton cassandraSessionSingleton = null;
public Session session;
private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
Cluster cluster = clusterBuilder.getCluster();
session = cluster.connect();
}
public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
if (cassandraSessionSingleton == null)
cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
return cassandraSessionSingleton;
}
}
Затем вы сможете использовать этот сеанс для всех будущих запросов.Здесь я использую ProcessFunction
для выполнения запросов в качестве примера.
public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
ClusterBuilder secureCassandraSinkClusterBuilder;
// Constructor
public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
}
@Override
public void ProcessElement (Object obj) throws Exception {
ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
return resultSet;
}
}
Обратите внимание, что вы можете передать ClusterBuilder
в ProcessFunction
, поскольку он сериализуем.Теперь для метода cassandraLookUp
, в котором мы выполняем запрос.
public class CassandraLookUp {
public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
Session session = cassandraSessionSingleton.session;
ResultSet resultSet = session.execute(query);
return resultSet;
}
}
Объект singleton создается только при первом запуске запроса, после этого тот же объект используется повторно, поэтому задержка отсутствуетв поиске.