Я бы взял другой подход к решению этой проблемы. Используя приведенный ниже код, мы можем объединить данные из нескольких файлов, все с одинаковыми именами, во фрейм данных и вставить все это в SQL Server. Это Scala, поэтому его нужно запускать в среде Azure Databricks.
# merge files with similar names into a single dataframe
val DF = spark.read.format("csv")
.option("sep","|")
.option("inferSchema","true")
.option("header","false")
.load("mnt/rawdata/corp/ABC*.gz")
DF.count()
# rename headers in dataframe
val newNames = Seq("ID", "FName", "LName", "Address", "ZipCode", "file_name")
val dfRenamed = df.toDF(newNames: _*)
dfRenamed.printSchema
# push the dataframe to sql server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
// Aquire a DataFrame collection (val collection)
val config = Config(Map(
"url" -> "my_sql_server.database.windows.net",
"databaseName" -> "my_db_name",
"dbTable" -> "dbo.my_table",
"user" -> "xxxxx",
"password" -> "xxxxx",
"connectTimeout" -> "5", //seconds
"queryTimeout" -> "5" //seconds
))
import org.apache.spark.sql.SaveMode
DF.write.mode(SaveMode.Append).sqlDB(config)
Код выше будет читать каждую строку каждого файла. Если заголовки находятся в первой строке, это прекрасно работает. Если заголовки и NOT в первой строке, используйте код ниже, чтобы создать конкретную схему, и снова прочитайте каждую строку каждого файла.
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.functions.input_file_name
val customSchema = StructType(Array(
StructField("field1", StringType, true),
StructField("field2", StringType, true),
StructField("field3", StringType, true),
StructField("field4", StringType, true),
StructField("field5", StringType, true),
StructField("field6", StringType, true),
StructField("field7", StringType, true)))
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("sep", "|")
.schema(customSchema)
.load("mnt/rawdata/corp/ABC*.gz")
.withColumn("file_name", input_file_name())
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"databaseName" -> "MyDatabase",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.write.mode(SaveMode.Append).
//df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.