Я пытаюсь запустить массовую вставку с использованием Scala и Spark Connector через блоки данных Azure. Я получаю закрытые ошибки соединения с SQL Server. Часть данных будет передаваться в таблицу назначения, но это лишь небольшой процент от общего количества. Хотите знать, если кто-то еще видел этот случай раньше.
РЕДАКТИРОВАТЬ: я заметил ошибки в драйверах, которые упоминают удаленного клиента удаленного RPC. Это может быть связано с тем, что количество массовых вставок, попадающих на сервер, достигает максимальных пороговых значений. Сейчас я пытаюсь использовать менее мощный кластер, чтобы определить, исправляет ли это какой-либо ограниченный параллелизм.
Код:
%scala
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val bulkCopyConfig = Config(Map(
"url" -> "myserver.database.windows.net",
"databaseName" -> "mydb",
"user" -> "myuser",
"password" -> "mypw",
"dbTable" -> "my_sql_tbl",
"bulkCopyBatchSize" -> "1048576",
"bulkCopyTableLock" -> "false",
"loginTimeout" -> "3600",
"bulkCopyTimeout" -> "100000000"
))
spark.table("my_databricks_tbl").bulkCopyToSqlDB(bulkCopyConfig)
Дамп ошибки:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 171 in stage 75.0 failed 4 times, most recent failure: Lost task 171.3 in stage 75.0 (TID 5315, 10.139.64.16, executor 172): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:227)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:796)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:2698)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:142)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:528)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:534)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2301)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2320)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2345)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:951)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:949)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:949)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2801)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2801)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2801)
at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3462)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3458)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2800)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.bulkCopyToSqlDB(DataFrameFunctions.scala:72)
at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3923847285087761:67)
at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3923847285087761:164)
at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw$$iw.<init>(command-3923847285087761:166)
at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw.<init>(command-3923847285087761:168)
at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw.<init>(command-3923847285087761:170)
at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw.<init>(command-3923847285087761:172)
at line12e7f598547340fe98bb640a4cccc5a836.$read.<init>(command-3923847285087761:174)
at line12e7f598547340fe98bb640a4cccc5a836.$read$.<init>(command-3923847285087761:178)
at line12e7f598547340fe98bb640a4cccc5a836.$read$.<clinit>(command-3923847285087761)
at line12e7f598547340fe98bb640a4cccc5a836.$eval$.$print$lzycompute(<notebook>:7)
at line12e7f598547340fe98bb640a4cccc5a836.$eval$.$print(<notebook>:6)
at line12e7f598547340fe98bb640a4cccc5a836.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:694)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:647)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:381)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:358)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:241)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:236)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:278)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:358)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:227)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:796)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:2698)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:142)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:528)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:534)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)