Я запускаю процесс для большого файла данных CSV, который анализируется в графе объектов, как этот ...
Company
- Address
- References []
- Ref 1
Это пользовательский анализатор, который выполняет построчный анализданные из исходного файла размером около 2 ГБ.
В результате получается, что объект разбивается на 3 разных пакета и передается в класс SqlBulkCopy для вставки в Sql.
Со временем Sql замедляется до тех пор, пока не истечет время ожидания.
Я могу увеличить тайм-аут, но все, что нужно сделать, это отодвинуть исключение назад и занять больше времени, чтобы достичь ... в конечном счете, хотя оно все равно будет достигнуто, чуть дальше вниз по процессу.
Есть ли способ принудительного применения правила постоянной производительности или чего-то в соединении / на SqlBulkCopy / в Sql Server, каким-то образом, который дает мне своего рода «фиксированное время ответа», так как все мои пакетыимеют известный фиксированный размер, и операция является просто повторением той же операции только с большим количеством данных?
Я ожидаю общее количество строк about 60 миллионов строк за всю операцию.
Вот как я это делаю ...
using (var parser = new MappedCSVParser<Company>(config, map, new BufferedStream(File.Open(sourceFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))))
{
var data = parser.Parse();
Company[] companies = data.Take(batchSize).Where(c => c.IsActive).ToArray();
Address[] addresses = companies.Select(c => c.Address).ToArray();
CompanyReference[] refs = companies.SelectMany(c => c.References).ToArray();
using (var conn = new SqlConnection(connStr))
{
using (var addCopier = new SqlBulkCopier<Address>(conn))
using (var comCopier = new SqlBulkCopier<Company>(conn))
using (var refCopier = new SqlBulkCopier<CompanyReference>(conn))
{
var addressReader = new ObjectDataReader<Address>();
var companyReader = new ObjectDataReader<Company>();
var refReader = new ObjectDataReader<CompanyReference>();
await conn.OpenAsync();
while (companies.Any())
{
addressReader.SetSource(addresses);
companyReader.SetSource(companies);
refReader.SetSource(refs);
await addCopier.WriteToServer(addressReader);
await comCopier.WriteToServer(companyReader);
await refCopier.WriteToServer(refReader);
batchesSent++;
companiesSent += companies.Length;
rejections += batchSize - companies.Length;
Log($"Progress - Batches Sent: {batchesSent}, Companies Sent: {companiesSent}, Rejected: {rejections}");
companies = data.Take(batchSize).Where(c => c.IsActive).ToArray();
addresses = companies.Select(c => c.Address).ToArray();
refs = companies.SelectMany(c => c.References).ToArray();
}
conn.Close();
}
}
}
РЕДАКТИРОВАТЬ:
Как и было запрошено, моя реализация SqlBulkCopy ...
class SqlBulkCopier<T> : IDisposable
{
SqlBulkCopy copier;
SqlConnection connection;
bool internalConnection = false;
public SqlBulkCopier(string connStr) : this(new SqlConnection(connStr))
{
internalConnection = true;
}
public SqlBulkCopier(SqlConnection conn)
{
connection = conn;
var flags = SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.UseInternalTransaction;
copier = new SqlBulkCopy(connection, flags, null);
copier.BulkCopyTimeout = 180;
foreach (var f in typeof(T).GetProperties().Where(p => p.PropertyType.IsValueType || p.PropertyType == typeof(string)))
copier.ColumnMappings.Add(f.Name, f.Name);
var tableInfo = typeof(T).GetCustomAttributes(typeof(TableAttribute), true)[0] as TableAttribute;
copier.DestinationTableName = $"{tableInfo.Schema ?? "dbo"}.{tableInfo.Name}";
}
public Task Connect() { return connection.OpenAsync(); }
public void Disconnect() { connection.Close(); }
public Task WriteToServer(ObjectDataReader<T> reader)
{
return copier.WriteToServerAsync(reader);
}
public void Dispose()
{
if (copier != null)
{
if(connection.State == System.Data.ConnectionState.Open) connection.Close();
if(internalConnection) connection.Dispose();
copier = null;
connection = null;
}
}
}