Как устранить ошибку массовой вставки блоков данных в базу данных Azure - PullRequest
0 голосов
/ 22 октября 2019

Я пытаюсь запустить массовую вставку с использованием Scala и Spark Connector через блоки данных Azure. Я получаю закрытые ошибки соединения с SQL Server. Часть данных будет передаваться в таблицу назначения, но это лишь небольшой процент от общего количества. Хотите знать, если кто-то еще видел этот случай раньше.

РЕДАКТИРОВАТЬ: я заметил ошибки в драйверах, которые упоминают удаленного клиента удаленного RPC. Это может быть связано с тем, что количество массовых вставок, попадающих на сервер, достигает максимальных пороговых значений. Сейчас я пытаюсь использовать менее мощный кластер, чтобы определить, исправляет ли это какой-либо ограниченный параллелизм.

Код:

%scala
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val bulkCopyConfig = Config(Map(
  "url"               -> "myserver.database.windows.net",
  "databaseName"      -> "mydb",
  "user"              -> "myuser",
  "password"          -> "mypw",
  "dbTable"           -> "my_sql_tbl",
  "bulkCopyBatchSize" -> "1048576",
  "bulkCopyTableLock" -> "false",
  "loginTimeout"      -> "3600",
  "bulkCopyTimeout"   -> "100000000"
))

spark.table("my_databricks_tbl").bulkCopyToSqlDB(bulkCopyConfig)

Дамп ошибки:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 171 in stage 75.0 failed 4 times, most recent failure: Lost task 171.3 in stage 75.0 (TID 5315, 10.139.64.16, executor 172): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:227)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:796)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:2698)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:142)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:528)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:534)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2301)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2320)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2345)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:951)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:949)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:949)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2801)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2801)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2801)
    at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3462)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
    at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3458)
    at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2800)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.bulkCopyToSqlDB(DataFrameFunctions.scala:72)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3923847285087761:67)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3923847285087761:164)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw$$iw.<init>(command-3923847285087761:166)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw$$iw.<init>(command-3923847285087761:168)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw$$iw.<init>(command-3923847285087761:170)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$$iw.<init>(command-3923847285087761:172)
    at line12e7f598547340fe98bb640a4cccc5a836.$read.<init>(command-3923847285087761:174)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$.<init>(command-3923847285087761:178)
    at line12e7f598547340fe98bb640a4cccc5a836.$read$.<clinit>(command-3923847285087761)
    at line12e7f598547340fe98bb640a4cccc5a836.$eval$.$print$lzycompute(<notebook>:7)
    at line12e7f598547340fe98bb640a4cccc5a836.$eval$.$print(<notebook>:6)
    at line12e7f598547340fe98bb640a4cccc5a836.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:694)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:647)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:197)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:381)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:358)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:241)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:236)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:278)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:358)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:227)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:796)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:2698)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:142)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2320)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:528)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:534)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Ответы [ 2 ]

0 голосов
/ 23 октября 2019

В соответствии с официальными документами для Microsoft SQL Server, как показано ниже, я думаю, что ваша проблема была вызвана двумя параметрами конфигурации bulkCopyBatchSize и bulkCopyTableLock.

  1. Выполнение операций массового копирования
  2. Управление размерами группового копирования

В первом документе есть описание TABLOCK.

TABLOCK : блокировка на уровне таблицы получается во время операции массового копирования. Этот параметр значительно повышает производительность, поскольку удержание блокировки только на время операции массового копирования уменьшает конфликт блокировки на таблице. Таблица может быть загружена несколькими клиентами одновременно, если таблица не имеет индексов и указано TABLOCK . По умолчанию поведение блокировки определяется параметром таблицы блокировка таблицы при массовой загрузке .

И основная причина - во втором документе.

Размер партии также может влиять на накладные расходы блокировки. При выполнении массового копирования для SQL Server подсказка TABLOCK может быть указана с использованием bcp_control для получения блокировки таблицы вместо блокировок строк. Блокировка одной таблицы может быть проведена с минимальными издержками для всей операции массового копирования. Если TABLOCK не указан, то блокировки удерживаются в отдельных строках, а накладные расходы на поддержание всех блокировок на время массового копирования могут снизить производительность. Поскольку блокировки удерживаются только для длины транзакции, указание размера пакета решает эту проблему путем периодической генерации коммита, который освобождает удерживаемые блокировки.

Количество строк, составляющих пакет, может оказать существенное влияние на производительностьпри массовом копировании большого количества строк. Рекомендации по размеру пакета зависят от типа выполняемого массового копирования.

  • При массовом копировании в SQL Server укажите подсказку для массового копирования TABLOCK и установите большой размер пакета.

  • Если TABLOCK не указан, ограничьте размеры партии менее 1000 строк.

Обратите внимание на слова выше: « Когда TABLOCK не указан, ограничьте размеры пакетов до 1000 строк. », поэтому я думаю, что предыдущие задачи вашего скрипта Scala использовали соединения пула соединенийБаза данных SQL Azure, то, чтобы вызвать следующие задачи не может получить необходимые подключения, когда вы установите значение bulkCopyTableLock с помощью false и значение bulkCopyBatchSize больше, чем 1000.

Поэтому, пожалуйста, попробуйтеустановите bulkCopyTableLock с помощью true, чтобы исправить это в своем коде Scala, даже уменьшить значение bulkCopyBatchSize должным образом.

0 голосов
/ 22 октября 2019

Я бы предложил уменьшить значение bulkCopyBatchSize, эта конфигурация относится к числу строк в каждом пакете, а не к размеру байтов.

https://static.javadoc.io/com.microsoft.sqlserver/mssql-jdbc/6.1.6.jre8-preview/com/microsoft/sqlserver/jdbc/SQLServerBulkCopyOptions.html#setBatchSize-int-

ЭтоЭто означает, что при текущей настройке "bulkCopyBatchSize" -> "1048576" вы загружаете более 1 миллиона строк в пакете

Посмотрите на образец MS: https://github.com/Azure/azure-sqldb-spark/blob/fa1cf19ed797648a20d9b7f474d7c2cd88829ada/samples/scripts/BulkCopySample.scala

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...