вычисление среднего значения, отклонения и стандартного отклонения от набора данных тренировки с использованием Spark Scala - PullRequest
0 голосов
/ 27 февраля 2020
I have a  dataframe  :
+----------------+----------------+---------------------+---------------+--------------------+-----+-
|origin_longitude|dest_longitude |origin_latitude|destination_latitude|speed|Distance|

 -7.1732833      |     -7.1732833|     32.0414966|          32.0414966|    50|     20.0|
 -7.1732833      |     -7.1732833|     32.0414966|          32.0414966|    40|     2.50|
 -7.1732833      |     -7.1732833|     32.0414966|          32.0414966|    30|     3.0 |
 -7.1732833      |     -7.1732833|     32.0414966|          32.0414966|    10|     98.0|
 -7.1732833      |     -7.1732833|     32.0414966|          32.0414966|    10|     3.80|

Я хочу применить нормальный закон к столбцу «Расстояние» в DataFrame, и для этого я должен сначала разделить набор данных на тренировочные данные и тестовые данные, а затем я должен вычислить среднее (среднее) и изменение данных обучения. Таким образом, чтобы разделить данные, я сделал это следующим образом:

val Array(trainingData, testData) = DF.randomSplit(Array(0.7 , 0.3), seed = 1234L)

Чтобы рассчитать среднее значение, я сделал так:

trainingData.toDF().agg(avg(col("Distance"))).show()

Я получаю эту ошибку:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2379)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:173)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:313)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:405)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
at org.apache.spark.sql.Dataset.show(Dataset.scala:816)
at org.apache.spark.sql.Dataset.show(Dataset.scala:775)
at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
at test$.main(test.scala:111)
at test.main(test.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef

Должен ли я действовать так же, как я, и есть ли у вас какие-либо идеи относительно того, что мне следует сделать, чтобы решить эту проблему. Спасибо.

1 Ответ

0 голосов
/ 29 февраля 2020

Я решил проблему, изменив код следующим образом:

val splits =k.cache().randomSplit(Array(0.7, 0.3), seed = 11L)
 val training = splits(0)
 val test =splits(1)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...