C # для SQL Server SqlBulkCopy большое количество строк - PullRequest
0 голосов
/ 26 июня 2019

Я запускаю процесс для большого файла данных CSV, который анализируется в графе объектов, как этот ...

Company
 - Address
 -  References [] 
     - Ref 1

Это пользовательский анализатор, который выполняет построчный анализданные из исходного файла размером около 2 ГБ.

В результате получается, что объект разбивается на 3 разных пакета и передается в класс SqlBulkCopy для вставки в Sql.

Со временем Sql замедляется до тех пор, пока не истечет время ожидания.

Я могу увеличить тайм-аут, но все, что нужно сделать, это отодвинуть исключение назад и занять больше времени, чтобы достичь ... в конечном счете, хотя оно все равно будет достигнуто, чуть дальше вниз по процессу.

Есть ли способ принудительного применения правила постоянной производительности или чего-то в соединении / на SqlBulkCopy / в Sql Server, каким-то образом, который дает мне своего рода «фиксированное время ответа», так как все мои пакетыимеют известный фиксированный размер, и операция является просто повторением той же операции только с большим количеством данных?

Я ожидаю общее количество строк about 60 миллионов строк за всю операцию.

Вот как я это делаю ...

using (var parser = new MappedCSVParser<Company>(config, map, new BufferedStream(File.Open(sourceFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))))
{
    var data = parser.Parse();

    Company[] companies = data.Take(batchSize).Where(c => c.IsActive).ToArray();
    Address[] addresses = companies.Select(c => c.Address).ToArray();
    CompanyReference[] refs = companies.SelectMany(c => c.References).ToArray();

    using (var conn = new SqlConnection(connStr))
    {
        using (var addCopier = new SqlBulkCopier<Address>(conn))
        using (var comCopier = new SqlBulkCopier<Company>(conn))
        using (var refCopier = new SqlBulkCopier<CompanyReference>(conn))
        {
            var addressReader = new ObjectDataReader<Address>();
            var companyReader = new ObjectDataReader<Company>();
            var refReader = new ObjectDataReader<CompanyReference>();

            await conn.OpenAsync();

            while (companies.Any())
            {
                addressReader.SetSource(addresses);
                companyReader.SetSource(companies);
                refReader.SetSource(refs);

                await addCopier.WriteToServer(addressReader);
                await comCopier.WriteToServer(companyReader);
                await refCopier.WriteToServer(refReader);

                batchesSent++;
                companiesSent += companies.Length;
                rejections += batchSize - companies.Length;
                Log($"Progress - Batches Sent: {batchesSent}, Companies Sent: {companiesSent}, Rejected: {rejections}");

                companies = data.Take(batchSize).Where(c => c.IsActive).ToArray();
                addresses = companies.Select(c => c.Address).ToArray();
                refs = companies.SelectMany(c => c.References).ToArray();
            }

            conn.Close();
        }
    }
}

РЕДАКТИРОВАТЬ:

Как и было запрошено, моя реализация SqlBulkCopy ...

class SqlBulkCopier<T> : IDisposable
{
    SqlBulkCopy copier;
    SqlConnection connection;
    bool internalConnection = false;

    public SqlBulkCopier(string connStr) : this(new SqlConnection(connStr))
    {
        internalConnection = true;
    }

    public SqlBulkCopier(SqlConnection conn)
    {
        connection = conn;

        var flags = SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.KeepIdentity | SqlBulkCopyOptions.UseInternalTransaction;
        copier = new SqlBulkCopy(connection, flags, null);
        copier.BulkCopyTimeout = 180;

        foreach (var f in typeof(T).GetProperties().Where(p => p.PropertyType.IsValueType || p.PropertyType == typeof(string)))
            copier.ColumnMappings.Add(f.Name, f.Name);

        var tableInfo = typeof(T).GetCustomAttributes(typeof(TableAttribute), true)[0] as TableAttribute;
        copier.DestinationTableName = $"{tableInfo.Schema ?? "dbo"}.{tableInfo.Name}";
    }

    public Task Connect() { return connection.OpenAsync(); }

    public void Disconnect() { connection.Close(); }

    public Task WriteToServer(ObjectDataReader<T> reader)
    {
        return copier.WriteToServerAsync(reader);
    }

    public void Dispose()
    {
        if (copier != null)
        {
            if(connection.State == System.Data.ConnectionState.Open) connection.Close();
            if(internalConnection) connection.Dispose();

            copier = null;
            connection = null;
        }
    }
}
...