Как написать в Управление службой Cassandra с помощью Flink Cassandra Connector? - PullRequest
0 голосов
/ 05 апреля 2020

Я могу подключиться к AWS Управляемой службе Cassandra, используя приведенный ниже фрагмент кода.

CassandraSink.addSink(cassandraEntityStream)
    .setClusterBuilder(
        new ClusterBuilder() {

            private static final long serialVersionUID = 2793938419775311824L;

            @Override
            public Cluster buildCluster(Cluster.Builder builder) {
                return builder
                    .addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
                    .withPort(9142)
                    .withSSL()
                    .withCredentials(
                        "username",
                        "password")
                    .withLoadBalancingPolicy(
                        DCAwareRoundRobinPolicy
                            .builder()
                            .withLocalDc("ap-northeast-1")
                            .build())
                    //.withQueryOptions(option)
                    .build();
            }
        })
    .setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
    .build()
    .name("Write to Cassandra")
    .uid("cassandra_sink");

При получении потока POJO для Cassandra я получил следующее исключение:

com.datastax.driver.core.exceptions.InvalidQueryException: уровень согласованности LOCAL_ONE не поддерживается для этой операции. Поддерживаемые уровни согласованности: LOCAL_QUORUM

Мне удалось решить эту проблему в другом проекте (без использования flink), установив ConsistencyLevel = LOCAL_QUORUM, используя приведенный ниже фрагмент кода.

QueryOptions option = new QueryOptions();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);

final Cluster cluster =
    Cluster.builder()
        .addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
        .withPort(9142)
        .withSSL()
        .withQueryOptions(option) // NOTE
        .withAuthProvider(
            new PlainTextAuthProvider(
                "username",
                "password"))
        .withLoadBalancingPolicy(
            DCAwareRoundRobinPolicy.builder().withLocalDc("ap-northeast-1").build())
        .build();
final Session session = cluster.connect("test");

Когда я попробовал то же самое в Flink, я получаю следующее сообщение об ошибке:

Исключение в потоке "основной" организации. apache .flink.api.common.InvalidProgramException: com.datastax.driver .core. QueryOptions@130161f7 не сериализуется. Объект, вероятно, содержит или ссылается на не сериализуемые поля.

Есть что-нибудь, что я пропускаю? Пожалуйста, подробно расскажите, как подключиться / записать в MCS с помощью разъема Flink Cassandra.

PS:

  1. Я использовал приведенную ниже команду для создания пространства ключей.
    CREATE KEYSPACE "test"
    WITH
        REPLICATION = {'class': 'SingleRegionStrategy'}

Я не использовал AmazonRootCA1.pem в своем коде.

Я не использую cassandra_truststore.jks в своем коде или среде.

У меня есть установленный сертификат temp_file.der сертификат, который был создан с помощью следующих этих шагов.

Я использую Flink 1.8.2, поскольку версия среды, доступная в Kinesis Data Analytics

ОБНОВЛЕНИЕ 07-04-2020

Я могу исправить проблему сериализации, создав Serializable оболочку для QueryOptions. Пожалуйста, найдите фрагмент кода ниже:

import com.datastax.driver.core.QueryOptions;
import java.io.Serializable;

public class QueryOptionsSerializable extends QueryOptions implements Serializable {

  private static final long serialVersionUID = 2793938419775311824L;
}

С этим решением мне удалось установить уровень согласованности в коде LOCAL_QUORUM и запускать без каких-либо исключений.

    // Setting consistency level
    QueryOptionsSerializable option = new QueryOptionsSerializable();
    option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);


    CassandraSink.addSink(entityStream)
        .setClusterBuilder(
            new ClusterBuilder() {

              private static final long serialVersionUID = 2793938419775311824L;

              @Override
              public Cluster buildCluster(Cluster.Builder builder) {
                Cluster.Builder tempBuilder = builder.addContactPoint(host).withPort(port);

                if (isSSLEnabled) {
                  // enable SSL config if isSSLEnabled flag is ON.
                  tempBuilder.withSSL();
                }

                if (username != null && password != null) {
                  // if username & password is provided, use it for connection.
                  tempBuilder.withCredentials(username, password);
                }

                tempBuilder.withQueryOptions(option);

                return tempBuilder.build();
              }
            })
        .setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
        .setDefaultKeyspace(keyspace)
        .build()
        .name("Write to Cassandra")
        .uid("cassandra_sink");

Но пока при записи в MCS я получаю ту же ошибку:

com.datastax.driver.core.exceptions.InvalidQueryException: уровень согласования LOCAL_ONE не поддерживается для этой операции. Поддерживаемые уровни согласованности: LOCAL_QUORUM

Любая помощь будет принята с благодарностью!

1 Ответ

2 голосов
/ 07 апреля 2020

Наконец разобрался. Речь шла о согласованности настроек с использованием аннотации @Table.

Ниже приведен фрагмент кода:

@Table(name = "report", readConsistency = "LOCAL_QUORUM", writeConsistency = "LOCAL_QUORUM")
public class SampleEntity {

  @Column(name = "user_id")
  @PartitionKey(0)
  private String userId;


  @Column(name = "join_date")
  @PartitionKey(0)
  private String joinDate;

}

...