сохранение набора данных в кассандру с использованием java искры - PullRequest
0 голосов
/ 08 мая 2020

Я пытаюсь сохранить набор данных в cassandra db, используя java spark. Я могу успешно прочитать данные в наборе данных, используя приведенный ниже код

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

Но когда я пытаюсь записать набор данных, я получаю IOException: не удалось загрузить или найти таблицу, найдены похожие таблицы в keypace

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();

Я устанавливаю хост и порт в режиме Sparksession Дело в том, что я могу писать в режимах перезаписи и добавления, но не могу создать таблицу

Версии, которые Я использую следующие: spark java 2.0 Spark cassandra connector 2.3

Пробовал с разными версиями jar, но ничего не получалось. Я также прошел через различные переполнение стека и ссылки на github

Любая помощь очень признателен.

1 Ответ

0 голосов
/ 11 мая 2020

Операция write в Spark не имеет режима, который автоматически создает для вас таблицу - для этого есть несколько причин. Один из них заключается в том, что вам необходимо определить первичный ключ для вашей таблицы, иначе вы можете просто перезаписать данные, если вы установили неправильный первичный ключ. По этой причине Spark Cassandra Connector предоставляет отдельный метод для создания таблицы на основе структуры вашего фрейма данных , но вам необходимо предоставить список ключевых столбцов секционирования и кластеризации. В Java это будет выглядеть следующим образом (полный код здесь ):

DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
          CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
          partitionSeqlist, clusteringSeqlist, connector);

, а затем вы можете записывать данные как обычно:

dataset.write()
   .format("org.apache.spark.sql.cassandra")
   .options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
   .save();
...