Я могу подключиться к AWS Управляемой службе Cassandra, используя приведенный ниже фрагмент кода.
CassandraSink.addSink(cassandraEntityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withCredentials(
"username",
"password")
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc("ap-northeast-1")
.build())
//.withQueryOptions(option)
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
При получении потока POJO для Cassandra я получил следующее исключение:
com.datastax.driver.core.exceptions.InvalidQueryException: уровень согласованности LOCAL_ONE не поддерживается для этой операции. Поддерживаемые уровни согласованности: LOCAL_QUORUM
Мне удалось решить эту проблему в другом проекте (без использования flink), установив ConsistencyLevel = LOCAL_QUORUM, используя приведенный ниже фрагмент кода.
QueryOptions option = new QueryOptions();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
final Cluster cluster =
Cluster.builder()
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withQueryOptions(option) // NOTE
.withAuthProvider(
new PlainTextAuthProvider(
"username",
"password"))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc("ap-northeast-1").build())
.build();
final Session session = cluster.connect("test");
Когда я попробовал то же самое в Flink, я получаю следующее сообщение об ошибке:
Исключение в потоке "основной" организации. apache .flink.api.common.InvalidProgramException: com.datastax.driver .core. QueryOptions@130161f7 не сериализуется. Объект, вероятно, содержит или ссылается на не сериализуемые поля.
Есть что-нибудь, что я пропускаю? Пожалуйста, подробно расскажите, как подключиться / записать в MCS с помощью разъема Flink Cassandra.
PS:
- Я использовал приведенную ниже команду для создания пространства ключей.
CREATE KEYSPACE "test"
WITH
REPLICATION = {'class': 'SingleRegionStrategy'}
Я не использовал AmazonRootCA1.pem в своем коде.
Я не использую cassandra_truststore.jks в своем коде или среде.
У меня есть установленный сертификат temp_file.der
сертификат, который был создан с помощью следующих этих шагов.
Я использую Flink 1.8.2, поскольку версия среды, доступная в Kinesis Data Analytics
ОБНОВЛЕНИЕ 07-04-2020
Я могу исправить проблему сериализации, создав Serializable оболочку для QueryOptions. Пожалуйста, найдите фрагмент кода ниже:
import com.datastax.driver.core.QueryOptions;
import java.io.Serializable;
public class QueryOptionsSerializable extends QueryOptions implements Serializable {
private static final long serialVersionUID = 2793938419775311824L;
}
С этим решением мне удалось установить уровень согласованности в коде LOCAL_QUORUM и запускать без каких-либо исключений.
// Setting consistency level
QueryOptionsSerializable option = new QueryOptionsSerializable();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
CassandraSink.addSink(entityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
Cluster.Builder tempBuilder = builder.addContactPoint(host).withPort(port);
if (isSSLEnabled) {
// enable SSL config if isSSLEnabled flag is ON.
tempBuilder.withSSL();
}
if (username != null && password != null) {
// if username & password is provided, use it for connection.
tempBuilder.withCredentials(username, password);
}
tempBuilder.withQueryOptions(option);
return tempBuilder.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.setDefaultKeyspace(keyspace)
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
Но пока при записи в MCS я получаю ту же ошибку:
com.datastax.driver.core.exceptions.InvalidQueryException: уровень согласования LOCAL_ONE не поддерживается для этой операции. Поддерживаемые уровни согласованности: LOCAL_QUORUM
Любая помощь будет принята с благодарностью!