Вы не можете удалить через DF API, и это неестественно через API RDD.RDD и DF являются неизменяемыми, что означает отсутствие изменений.Вы можете отфильтровать их, чтобы сократить их, но это создаст новый RDD / DF.
Сказав, что , вы можете отфильтровать строки, которые вы хотите удалить, а затем просто создать клиент C * для выполнения этого удаления:
// import для подключения Spark и C *. org.apache.spark.sql.cassandra._ import com.datastax.spark.connector.cql.CassandraConnectorConf
spark.setCassandraConf("Test Cluster", CassandraConnectorConf.ConnectionHostParam.option("localhost"))
val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "books_ks", "table" -> "books")).load()
val dfToDelete = df.filter($"price" < 3).select($"price");
dfToDelete.show();
// import for C* client
import com.datastax.driver.core._
// build a C* client (part of the dependency of the scala driver)
val clusterBuilder = Cluster.builder().addContactPoints("127.0.0.1");
val cluster = clusterBuilder.build();
val session = cluster.connect();
// loop over everything that you filtered in the DF and delete specified row.
for(price <- dfToDelete.collect())
session.execute("DELETE FROM books_ks.books WHERE price=" + price.get(0).toString);
Несколько предупреждений Этоне будет работать хорошо, если вы пытаетесь удалить большую часть строк.Использование сбора здесь означает, что эта работа будет выполнена в программе драйвера Spark, также известной как SPOF и бутылочное вырезание.
Лучшим способом сделать это было бы: а) определить DF UDF для выполнения удаления, преимущество в том, что вы получите распараллеливание.Вариант б) до уровня RDD и просто удалить, как вы показали выше.
Мораль истории, просто потому что это может быть сделано, не означает, что это должно быть сделано.