Я ищу, чтобы сохранить весь Dataframe в цикле foreachpartition на Cassandra.
Я знаю, что могу получить Cassandra Connector в пределах foreachpartition и выполнить операторы CRUD на cassandra, используя следующий код:
val conf: SparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", "IP")
.set("spark.cassandra.auth.username", "username")
.set("spark.cassandra.auth.password", "pwd")
val cdbConnector = CassandraConnector(conf)
cdbConnector.withSessionDo(session =>
session.execute(//Insert statement)
)
Но меня больше интересует хранение всего Dataframe в Cassandraтаблица за один раз в пределах foreachpartition.
Кроме того, когда я выполняю приведенную ниже инструкцию для создания фрейма данных и записываю его в Cassandra в пределах foreachpartition, мое приложение зависает, и все потоки находятся в состоянии ожидания.
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "***", "keyspace" -> "***")).save()