Запись в облачную таблицу DB2 с использованием pyspark - PullRequest
0 голосов
/ 23 января 2020

Я только начал свое путешествие по ETL, используя pyspark. Моя текущая цель - записать данные из .csv в dashdb, используя режим добавления. Однако я столкнулся с проблемой, которую не могу решить. Вот что я сделал до сих пор:

Я прочитал .csv и зарегистрировал его во временной таблице, чтобы иметь возможность вызывать SQL запросов к нему. Результат запроса следующий:

+--------------------+--------+-------+-----+------+
|              Street|District|Area_m2|Rooms| Price|
+--------------------+--------+-------+-----+------+
|          Angyalföld|    XIII|    105|    2|320000|
|            Belváros|       V|     70|    2|230000|
|         Pozsonyi út|    XIII|     89|    2|290000|
|         Fecske utca|    VIII|     33|    1|130000|
|Margó Tivadar utc...|   XVIII|     80|    2|220000|
|      Orczy út 46-48|    VIII|     44|    2|120000|
|        Vaskapu utca|      IX|     51|1 + 1|185000|
|       Gubacsi út 19|      IX|     30|    1|105000|
|         Öv utca 133|     XIV|     29|    1|150000|
|         Mérleg utca|       V|     54|    2|190000|
|          Szirtes út|       I|    160|    4|389000|
|       Gubacsi út 19|      IX|     50|    2|130000|
|          Török utca|      II|     53|1 + 1|165000|
|          Ferenc tér|      IX|     65|    2|235000|
|       Kiscelli utca|     III|     34|    1|190000|
|     Dózsa György út|     VII|     47|1 + 1|130000|
|         Vadász utca|       V|     60|1 + 1|185000|
|      István utca 7.|     VII|     30|    1|120000|
|          Regős utca|      XI|     53|    2|180000|
|      Országház utca|       I|    122|    3|680000|
+--------------------+--------+-------+-----+------+

Столбцы в таблице db2 имеют следующие типы: varchar (50), varchar (10), integer, varchar (10), integer. Я сохранил вывод в переменной и использовал следующий код для записи в dashdb:

output.write.jdbc(jdbc_url, table, properties = connection_properties, mode = 'append')

При запуске этого кода выдается следующее сообщение об ошибке:

An error occurred while calling o310.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11, localhost, executor driver): com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure.  The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
    at com.ibm.db2.jcc.am.fd.a(fd.java:404)
    at com.ibm.db2.jcc.am.o.a(o.java:381)
    at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
    at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
    at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
        at com.ibm.db2.jcc.am.fd.a(fd.java:668)
        at com.ibm.db2.jcc.am.fd.a(fd.java:60)
        at com.ibm.db2.jcc.am.fd.a(fd.java:127)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
        at com.ibm.db2.jcc.t4.q.a(q.java:57)
        at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
        at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
        at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
        at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
        ... 17 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:515)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][3.62.56] Batch failure.  The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
    at com.ibm.db2.jcc.am.fd.a(fd.java:404)
    at com.ibm.db2.jcc.am.o.a(o.java:381)
    at com.ibm.db2.jcc.am.kn.a(kn.java:4523)
    at com.ibm.db2.jcc.am.kn.c(kn.java:4294)
    at com.ibm.db2.jcc.am.kn.executeBatch(kn.java:2600)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
    Suppressed: com.ibm.db2.jcc.am.SqlDataException: Error for batch element #66: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=3.62.56
        at com.ibm.db2.jcc.am.fd.a(fd.java:668)
        at com.ibm.db2.jcc.am.fd.a(fd.java:60)
        at com.ibm.db2.jcc.am.fd.a(fd.java:127)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:481)
        at com.ibm.db2.jcc.t4.cb.a(cb.java:70)
        at com.ibm.db2.jcc.t4.q.a(q.java:57)
        at com.ibm.db2.jcc.t4.sb.a(sb.java:225)
        at com.ibm.db2.jcc.am.kn.a(kn.java:3083)
        at com.ibm.db2.jcc.am.kn.d(kn.java:5019)
        at com.ibm.db2.jcc.am.kn.a(kn.java:4466)
        ... 17 more

Может кто-нибудь объяснить, что пошло не так и как можно решить проблему? Заранее спасибо!

1 Ответ

0 голосов
/ 24 января 2020

Эту проблему можно решить, обеспечив кодировку utf-8 для .csv и удалив специальные символы.

...