Пакет Spark для переноса данных между 2 кластерами кассандры - PullRequest
1 голос
/ 07 ноября 2019

Я использую spark для перемещения некоторых данных из одной таблицы cassandra в другую таблицу cassandra в другом кластере.

Я указал конфигурацию cassandra для одного из исходных кластеров со следующими параметрами:

/*
spark.cassandra.connection.host: 
spark.cassandra.connection.port:
spark.cassandra.auth.username:
spark.cassandra.auth.password:
spark.cassandra.connection.ssl.clientAuth.enabled: true
spark.cassandra.connection.ssl.enabled: true
spark.cassandra.connection.ssl.trustStore.path: 
spark.cassandra.connection.ssl.trustStore.password: 
spark.cassandra.connection.timeout_ms: */

SparkSession spark = SparkSession.builder()
            .config(conf)
            .getOrCreate();

Dataset<Row> df = spark.read()
            .format("org.apache.spark.sql.cassandra")
            .options(config.getSourceTable())
            .load();
df.show();

// *** How/Where do I specify cassandra config in destination cluster? ***
df.write()
        .mode(SaveMode.Append)
        .format("org.apache.spark.sql.cassandra")
        .options(destinationTbl);

Как / Где я могу указать конфигурацию cassandra в целевом кластере (Java Perferred)?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 07 ноября 2019

У меня был похожий вариант использования, но в моем случае я не смог установить соединение со 2-м кластером, используя метод, предложенный Алекс из-за некоторой проблемы с соединителем. Поэтому мне пришлось преобразовать этот DataFrame в RDD и использовать методы RDD, чтобы записать его во 2-й кластер Cassandra

Передать все сведения о соединителе Cassandra в другой файл sparkConfig и проанализировать его с помощью CassandraConnector.

{    
val cluster: CassandraConnector = CassandraConnector(sparkConfig)

      implicit val c: CassandraConnector = cluster

      dataFrame
        .rdd
        .saveToCassandra(keySpaceName, tableName, SomeColumns(ListOfColumns)
}
0 голосов
/ 07 ноября 2019

Я не проверял его, но на основе сообщения в блоге Рассела Спитцера вы можете сделать следующее (не проверено в Java, но должно работать):

  • Set 2Опции конфигурации (или добавить их при создании spark экземпляра):
spark.setConf("ClusterSource/spark.cassandra.connection.host", "127.0.0.1");
spark.setConf("ClusterDestination/spark.cassandra.connection.host", "127.0.0.2");
  • Добавить в options имя соответствующего кластера в виде записи cluster.

PS Кроме того, помните, что если вам нужно будет перенести данные и сохранить для них WriteTime и / или TTL, вам нужно будет использовать RDD API, так как эти вещи не поддерживаются вAPI DataFrame.

...