Я пытаюсь записать искровой датафрейм на кассандру с уровнем согласованности "EACH_QUORUM". Мой код выглядит следующим образом:
val sparkBuilder = SparkSession.builder().
config(cassandraHostPropertyProperty, cassandraHosts).
config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
getOrCreate();
Ниже приведен код для записи DF:
df.write.cassandraFormat(tableName, keySpaceName)
.mode(SaveMode.Append)
.options(Map(
WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
))
.save()
Я хочу добавить политику повторов, чтобы, если один из центров обработки данных был отключен,записывать понижает согласованность до LOCAL_QUORUM.
Я знаю, что у datastax есть класс MultipleRetryPolicy.scala , который я должен расширить, переопределить методы, чтобы добавить собственную логику и использовать ее экземпляр в cassandra conf.
Как я могу применить эту политику к моей свече или операции сохранения? Есть ли другой способ в scala с использованием RetryPolicy или без него для достижения моих требований?