Dasastax искровой соединитель кассандры с RetryPolicy для записи DF в таблицу кассандры - PullRequest
0 голосов
/ 15 октября 2019

Я пытаюсь записать искровой датафрейм на кассандру с уровнем согласованности "EACH_QUORUM". Мой код выглядит следующим образом:

val sparkBuilder = SparkSession.builder().
  config(cassandraHostPropertyProperty, cassandraHosts).
  config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
  config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
  config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
  getOrCreate();

Ниже приведен код для записи DF:

df.write.cassandraFormat(tableName, keySpaceName)
    .mode(SaveMode.Append)
    .options(Map(
      WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
      WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
    ))
    .save()

Я хочу добавить политику повторов, чтобы, если один из центров обработки данных был отключен,записывать понижает согласованность до LOCAL_QUORUM.

Я знаю, что у datastax есть класс MultipleRetryPolicy.scala , который я должен расширить, переопределить методы, чтобы добавить собственную логику и использовать ее экземпляр в cassandra conf.

Как я могу применить эту политику к моей свече или операции сохранения? Есть ли другой способ в scala с использованием RetryPolicy или без него для достижения моих требований?

1 Ответ

0 голосов
/ 16 октября 2019

Вы не хотите MultipleRetryPolicy, вы после DowngradingConsistencyRetryPolicy , который не является частью драйвера спрэка, поэтому выполнение этого как части настроек драйвера не выполняется, если вы не перенесете политику наСкала.

Что вы можете сделать, это обернуть выполнение запроса в попытку и поймать UnavailableException, а затем просто повторите попытку с меньшей согласованностью, изменив параметр output.consistency.level .

...