Во время массового копирования azure-sqldb-sparkCountToSqlDB происходит повреждение данных, когда для DWU хранилища данных Azure установлено низкое значение 100. Существует ли способ регулирования массовой копии, чтобы не возникала ошибка повреждения данных.
Когда мы увеличиваем DWU Azure Datawarehouse до 300. Эта ошибка не возникает, но мы хотели бы оставить DWU равным 100, чтобы цена оставалась низкой.
В настоящее время мы пытаемся снизить скорость копирования bulkCopyConfig.bulkCopyBatchSize, чтобы выяснить, решит ли это проблему.
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val bulkCopyConfig = Config(Map(
"url" -> jdbcHostname,
"databaseName" -> jdbcDatabase,
"dbTable" -> "*****************",
"user" -> jdbcUsername,
"password" -> jdbcPassword,
"connectTimeout" -> "5", //seconds
"queryTimeout" -> "5", //seconds
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
raw1_1.bulkCopyToSqlDB(bulkCopyConfig)
Во время массового копирования возникает следующая ошибка.
Последнее исключение - это исключение TDS Parser, где мы не можем интерпретировать данные. Я подозреваю, что это произошло из-за повреждения данных, поскольку хранилище данных Azure не может обработать объемную копию достаточно быстро. Есть ли способ ограничить поток копии?
2019-06-03 06:23:50 [ERROR] com.fourones.fobis.logsaver.Batch$ - org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 11.0 failed 4 times, most recent failure: Lost task 26.3 in stage 11.0
(TID 442, wn14-realsp.zoflgxonef1uxkn5kwl11tqc5d.syx.internal.cloudapp.net, executor 1): com.microsoft.sqlserver.jdbc.SQLServerException: PdwManagedToNativeInteropException ErrorNumber: 46724,
MajorCode: 467, MinorCode: 24, Severity: 20, State: 2, Exception of type 'Microsoft.SqlServer.DataWarehouse.Tds.PdwManagedToNativeInteropException' was thrown.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:254)
at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:258)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:104)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:26)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1597)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$200(SQLServerBulkCopy.java:63)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:705)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7240)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2869)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:733)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1669)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:665)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)