Искра setCassandraConf не работает должным образом - PullRequest
0 голосов
/ 18 октября 2018

Я использую .setCassandraConf (c_options_conf), чтобы установить sparkSession для подключения кластера кассандры, как показано ниже.

Работает нормально:

 val spark = SparkSession
      .builder()
      .appName("DatabaseMigrationUtility")
      .config("spark.master",devProps.getString("deploymentMaster"))
      .getOrCreate()
                .setCassandraConf(c_options_conf)

Если я сохраню таблицу, используя объект записи данных, как показано нижеон указывает на сконфигурированный кластер и прекрасно сохраняет сохранение в Cassandra, как показано ниже

 writeDfToCassandra(o_vals_df, key_space , "model_vals"); //working fine using o_vals_df.

Но если сказать, как показано ниже, он указывает на localhost вместо кластера cassandra и не может сохранить.

Неработает:

import spark.implicits._
val sc = spark.sparkContext

val audit_df = sc.parallelize(Seq(LogCaseClass(columnFamilyName, status,
      error_msg,currentDate,currentTimeStamp, updated_user))).saveToCassandra(keyspace, columnFamilyName);

Выдает ошибку при попытке подключения localhost.

Ошибка:

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (tried: localhost/127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException:
[localhost/127.0.0.1:9042] Cannot connect))
            at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)

Что здесь не так?Почему он указывает на локальный хост по умолчанию, несмотря на то, что для sparkSession установлено значение кластера cassandra и более ранний метод работает нормально.

Ответы [ 2 ]

0 голосов
/ 06 ноября 2018

Нам нужно установить конфигурацию, используя два метода установки SparkSession, то есть .config(conf) и .setCassandraConf(c_options_conf) с такими же значениями, как показано ниже

  val spark = SparkSession
        .builder()
        .appName("DatabaseMigrationUtility")
        .config("spark.master",devProps.getString("deploymentMaster"))
        .config("spark.dynamicAllocation.enabled",devProps.getString("spark.dynamicAllocation.enabled"))
        .config("spark.executor.memory",devProps.getString("spark.executor.memory"))
        .config("spark.executor.cores",devProps.getString("spark.executor.cores"))
        .config("spark.executor.instances",devProps.getString("spark.executor.instances"))
        .config(conf)

        .getOrCreate()
        .setCassandraConf(c_options_conf)

Тогда я бы работал и для последних API-интерфейсов Кассандрыкак RDD / DF Api.

0 голосов
/ 19 октября 2018

Настройка IP через spark.cassandra.connection.host Свойство Spark (не через setCassandraConf!) Работает как для RDD, так и для DataFrames.Это свойство может быть установлено из командной строки при отправке задания или явно (пример из документации):

val conf = new SparkConf(true)
    .set("spark.cassandra.connection.host", "192.168.123.10")
    .set("spark.cassandra.auth.username", "cassandra")            
    .set("spark.cassandra.auth.password", "cassandra")

val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf)

Ознакомьтесь с документацией для соединителя , включая ссылку на существующий свойства конфигурации .

...