Python Spark. Примером недопустимого значения являются данные типа цифра c со шкалой, превышающей точность - PullRequest
0 голосов
/ 11 марта 2020

У меня есть фрейм данных искры, который имеет следующие типы данных:

date          time       RY    SAD  SAF 24hrs_v 24hrs_I Normalized_SAD              Normalized_SAF
2008-04-02   07:30:00   2009    0   0   2176    28           0                          0
2008-04-02   08:00:00   2009    0   0   2176    28           0                          0
2008-04-02   08:30:00   2009    0   0   2176    28           0                          0
2008-04-02   09:00:00   2009    0   0   2176    28           0                          0
2008-04-02   09:30:00   2009    240 4   2416    32           0.11854166666666667        4
2008-04-02   10:00:00   2009    0   0   1560    28           0                          0
2008-04-02   10:30:00   2009    240 4   1800    32           0.11854166666666667        4
2008-04-02   11:00:00   2009    0   0   1800    32           0                          0
2008-04-02   11:30:00   2009    0   0   1800    32           0                          0
2008-04-02   12:00:00   2009    0   0   1800    32           0                          0
2008-04-02   12:30:00   2009    0   0   1800    32           0                          0
2008-04-02   13:00:00   2009    0   0   1800    32           0                          0
2008-04-02   13:30:00   2009    0   0   1800    32           0                          0
2008-04-02   14:00:00   2009    0   0   1800    32           0                          0
2008-04-03   09:30:00   2009    10688   1308    12           0.11854166666666667        8
2008-04-03   10:00:00   2009    872 4   2180    16           0.11854166666666667        4

Тип данных фрейма данных искры:

date    string
time    string
RY  bigint
SAD double
SAF double
24hrs_v double
24hrs_I double
Normalized_SAD  double
Normalized_SAF  double

Я пытался записать фрейм данных на SQL сервер, использующий следующую команду:

spark_df.write.jdbc(jdbcUrl, table=tableName,mode='overwrite',properties=connectionProperties)

Когда я запускаю указанную выше команду, она выдает следующую ошибку:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 10.0 failed 4 times, most recent failure: Lost task 4.3 in stage 10.0 (TID 39, 10.139.64.6, executor 3): com.microsoft.sqlserver.jdbc.SQLServerException: The incoming tabular data stream (TDS) remote procedure call (RPC) protocol stream is incorrect. Parameter 5 (""): The supplied value is not a valid instance of data type float. Check the source data for invalid values. An example of an invalid value is data of numeric type with scale greater than precision. 

Может кто-нибудь подсказать мне, где я совершаю ошибку?

Вот полный след сообщения об ошибке:

Py4JJavaError                             Traceback (most recent call last)
<command-1691615560915027> in <module>
----> 1 spark_df.write.jdbc(jdbcUrl, table="dbo.NormalisedUnplannedDPP3",mode='overwrite',properties=connectionProperties)

/databricks/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
    985         for k in properties:
    986             jprop.setProperty(k, properties[k])
--> 987         self.mode(mode)._jwrite.jdbc(url, table, jprop)
    988 
    989 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o4210.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 9.0 failed 4 times, most recent failure: Lost task 4.3 in stage 9.0 (TID 51, 10.139.64.5, executor 0): com.microsoft.sqlserver.jdbc.SQLServerException: The incoming tabular data stream (TDS) remote procedure call (RPC) protocol stream is incorrect. Parameter 5 (""): The supplied value is not a valid instance of data type float. Check the source data for invalid values. An example of an invalid value is data of numeric type with scale greater than precision.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatementBatch(SQLServerPreparedStatement.java:2482)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtBatchExecCmd.doExecute(SQLServerPreparedStatement.java:2383)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2294)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:843)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:843)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:987)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:987)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2321)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2321)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:533)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:539)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:987)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:985)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:985)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:843)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:63)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:147)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:188)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:118)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:112)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:171)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:549)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The incoming tabular data stream (TDS) remote procedure call (RPC) protocol stream is incorrect. Parameter 5 (""): The supplied value is not a valid instance of data type float. Check the source data for invalid values. An example of an invalid value is data of numeric type with scale greater than precision.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1535)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatementBatch(SQLServerPreparedStatement.java:2482)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtBatchExecCmd.doExecute(SQLServerPreparedStatement.java:2383)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2294)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:843)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:843)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:987)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:987)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2321)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2321)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:533)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:539)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Любая помощь будет оценена .???

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...