Я только начал свое путешествие по ETL, используя pyspark. Моя текущая цель - записать данные из .csv в dashdb, используя режим добавления. Однако я столкнулся с проблемой, которую не могу решить. Вот что я сделал до сих пор:
Я прочитал .csv и зарегистрировал его во временной таблице, чтобы иметь возможность вызывать SQL запросов к нему. Результат запроса следующий:
+--------------------+--------+-------+-----+------+
| Street|District|Area_m2|Rooms| Price|
+--------------------+--------+-------+-----+------+
| Angyalföld| XIII| 105| 2|320000|
| Belváros| V| 70| 2|230000|
| Pozsonyi út| XIII| 89| 2|290000|
| Fecske utca| VIII| 33| 1|130000|
|Margó Tivadar utc...| XVIII| 80| 2|220000|
| Orczy út 46-48| VIII| 44| 2|120000|
| Vaskapu utca| IX| 51|1 + 1|185000|
| Gubacsi út 19| IX| 30| 1|105000|
| Öv utca 133| XIV| 29| 1|150000|
| Mérleg utca| V| 54| 2|190000|
| Szirtes út| I| 160| 4|389000|
| Gubacsi út 19| IX| 50| 2|130000|
| Török utca| II| 53|1 + 1|165000|
| Ferenc tér| IX| 65| 2|235000|
| Kiscelli utca| III| 34| 1|190000|
| Dózsa György út| VII| 47|1 + 1|130000|
| Vadász utca| V| 60|1 + 1|185000|
| István utca 7.| VII| 30| 1|120000|
| Regős utca| XI| 53| 2|180000|
| Országház utca| I| 122| 3|680000|
+--------------------+--------+-------+-----+------+
Столбцы в таблице db2 имеют следующие типы: varchar (50), varchar (10), integer, varchar (10), integer. Я сохранил вывод в переменной и использовал следующий код для записи в dashdb:
output.write.jdbc(jdbc_url, table, properties = connection_properties, mode = 'append')
При запуске этого кода выдается следующее сообщение об ошибке:
An error occurred while calling o310.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11, localhost, executor driver): com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure. The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
at com.ibm.db2.jcc.am.fd.a(fd.java:404)
at com.ibm.db2.jcc.am.o.a(o.java:381)
at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
at com.ibm.db2.jcc.am.fd.a(fd.java:668)
at com.ibm.db2.jcc.am.fd.a(fd.java:60)
at com.ibm.db2.jcc.am.fd.a(fd.java:127)
at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
at com.ibm.db2.jcc.t4.q.a(q.java:57)
at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
... 17 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:515)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure. The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
at com.ibm.db2.jcc.am.fd.a(fd.java:404)
at com.ibm.db2.jcc.am.o.a(o.java:381)
at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
at com.ibm.db2.jcc.am.fd.a(fd.java:668)
at com.ibm.db2.jcc.am.fd.a(fd.java:60)
at com.ibm.db2.jcc.am.fd.a(fd.java:127)
at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
at com.ibm.db2.jcc.t4.q.a(q.java:57)
at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
... 17 more
Может кто-нибудь объяснить, что пошло не так и как можно решить проблему? Заранее спасибо!