Как записать фрейм данных, содержащий структуру, в cassandratable - PullRequest
0 голосов
/ 16 апреля 2019

Я пытаюсь записать данные фрейма данных в таблицу cassandra.Как мне отобразить структуру в dataframe на столбец в таблице cassandra.

Схема кадра данных

  root
   |-- _1: struct (nullable = true)
    |    |-- _1: string (nullable = true)
    |    |-- _2: integer (nullable = false)
    |    |-- _3: integer (nullable = false)
    |    |-- _4: integer (nullable = false)
    |-- _2: double (nullable = false)

Описание CassandraTable (.. Я пытаюсь использовать кортеж для сохранения данных структуры)

  create table knmi_r (idnkey tuple<text,int,int,int>, logval int, primary key (idnkey) ) ;

ниже мой код длясохранить в таблицу cassandra

              resultdf.printSchema()
              resultdf.write
                      .format("org.apache.spark.sql.cassandra")
                      .options(Map("table" -> "knmi_r", "keyspace" -> "dbks1"))
                      .save() 

Однако я получаю ошибку ниже.Может кто-нибудь, пожалуйста, подскажите мне, как пройти через это.

              Exception in thread "main" java.util.NoSuchElementException: Columns not found in table dbks1.knmi_r: _1, _2
                at com.datastax.spark.connector.SomeColumns.selectFrom(ColumnSelector.scala:44)
                at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:385)
                at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)
                at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
                at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:90)
                at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
                at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
                at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
                at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
                at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
                at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
                at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
                at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
                at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
                at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
                at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
                at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
                at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
                at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
                at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
                at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
                at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
                at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
                at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
                at nl.rug.sc.SparkExample.testExample(SparkExample.scala:286)
                at nl.rug.sc.app.SparkBootcamp$class.run(SparkBootcamp.scala:19)
                at nl.rug.sc.app.SparkLocalMain$.run(SparkLocalMain.scala:6)
                at nl.rug.sc.app.SparkLocalMain$.delayedEndpoint$nl$rug$sc$app$SparkLocalMain$1(SparkLocalMain.scala:18)
                at nl.rug.sc.app.SparkLocalMain$delayedInit$body.apply(SparkLocalMain.scala:6)
                at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
                at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
                at scala.App$$anonfun$main$1.apply(App.scala:76)
                at scala.App$$anonfun$main$1.apply(App.scala:76)
                at scala.collection.immutable.List.foreach(List.scala:392)
                at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
                at scala.App$class.main(App.scala:76)
                at nl.rug.sc.app.SparkLocalMain$.main(SparkLocalMain.scala:6)
                at nl.rug.sc.app.SparkLocalMain.main(SparkLocalMain.scala)

1 Ответ

0 голосов
/ 16 апреля 2019

Ошибка из-за несоответствия между именами схем и именами столбцов кассандры. Я просто переименовал их, и это сработало

                val newNames = Seq("idnkey", "logval")

                val resultdf1 = resultdf.toDF(newNames: _*)

                resultdf1.printSchema

теперь схема данных соответствует таблице cassandra

    root
 |-- idnkey: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: integer (nullable = false)
 |    |-- _3: integer (nullable = false)
 |    |-- _4: integer (nullable = false)
 |-- logval: double (nullable = false)
...