SqlBulkCopy.WriteToServerAsyn c не учитывает ключевое слово `await`. Почему? - PullRequest
2 голосов
/ 19 февраля 2020

SqlBulkCopy.WriteToServerAsyn c не соответствует ключевому слову await. Почему?

Вот мой код:

public async Task UpdateDBWithXML(Action<Func<DataTable, Task>> readXmlInBatches, string hashKey, string hash)
{
    using (var transaction = this.Context.Database.BeginTransaction(IsolationLevel.ReadUncommitted))
    using (var bulk = new SqlBulkCopy((SqlConnection)this.Connection, SqlBulkCopyOptions.Default, (SqlTransaction)transaction.UnderlyingTransaction))
    {
        //this.Context.Database.ExecuteSqlCommand("DELETE FROM [dbo].[LegalContractorTemps]");

        bulk.DestinationTableName = "LegalContractorTemps";
        readXmlInBatches(async (DataTable table) =>
        {
            if (bulk.ColumnMappings.Count == 0)
            {
                foreach (DataColumn column in table.Columns)
                {
                    bulk.ColumnMappings.Add(new SqlBulkCopyColumnMapping(column.ColumnName, column.ColumnName));
                }
            }

            await bulk.WriteToServerAsync(table);
        });

        await this.Context.Database.ExecuteSqlCommandAsync(
            "EXECUTE dbo.LegalContractorsDataSynchronize @hashKey, @hash",
            new SqlParameter("@hashKey", hashKey),
            new SqlParameter("@hash", hash)
        );

        transaction.Commit();
    }
}

В параметре readXmlInBatches я передаю следующую функцию в качестве аргумента:

public void ReadXMLInBatches(Func<DataTable, Task> processBatch)
{
    int batchSize = 10000;
    var table = new DataTable();
    foreach (var col in columnNames)
    {
        table.Columns.Add(col);
    }

    using (var reader = new StreamReader(pathToXml, Encoding.GetEncoding(encoding)))
    using (var xmlReader = XmlReader.Create(reader))
    {
        string lastElement = null;
        DataRow lastRow = null;
        while (xmlReader.Read())
        {
            switch (xmlReader.NodeType)
            {
                case XmlNodeType.Element:
                    if (xmlReader.Name == "RECORD")
                    {
                        if (table.Rows.Count >= batchSize)
                        {
                            processBatch(table);
                            table.Rows.Clear();
                        }

                        lastRow = table.Rows.Add();
                    }
                    lastElement = xmlReader.Name;
                    break;
                case XmlNodeType.Text:
                    ReadMember(lastRow, lastElement, xmlReader.Value);
                    break;
            }
        }
        if (table.Rows.Count > 0)
        {
            processBatch(table);
            table.Rows.Clear();
        }
    }
}

Я имею в XML что-то около 1,7 миллиона записей. После того, как моя программа прочитала несколько пакетов, я получаю сообщение об ошибке:

System.Data.RowNotInTableException : 'Эта строка была удалена из таблицы и не имеет никаких данных , BeginEdit () позволит создавать новые данные в этой строке. '

Я исследовал исходный код SqlBulkCopy. И нашел метод, который выдает ошибку:

public Task WriteToServerAsync(DataTable table, DataRowState rowState, CancellationToken cancellationToken) {
            Task resultTask = null;
            SqlConnection.ExecutePermission.Demand();

            if (table == null) {
                throw new ArgumentNullException("table");
            }

            if (_isBulkCopyingInProgress){
                throw SQL.BulkLoadPendingOperation();
            }

            SqlStatistics statistics = Statistics;
            try {
                statistics = SqlStatistics.StartTimer(Statistics);
                _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
                _rowSource = table;
                _SqlDataReaderRowSource = null;
                _dataTableSource = table;
                _rowSourceType = ValueSourceType.DataTable;
                _rowEnumerator = table.Rows.GetEnumerator();
                _isAsyncBulkCopy = true;
                resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; 
            }
            finally {
                SqlStatistics.StopTimer(statistics);
            }
            return resultTask;
        }

Я заметил поле _isBulkCopyingInProgress и решил проверить его при отладке. И я обнаружил, что при возникновении ошибки поле true. Как это возможно? Я ожидал бы, что массовая вставка произойдет первой (до того, как выполнение продолжится и WriteToServerAsync будет вызван во второй раз), так как я добавляю await здесь: await bulk.WriteToServerAsync(table);.

Чего я мог пропустить

1 Ответ

3 голосов
/ 19 февраля 2020

Вы передаете асинхронную функцию на ReadXMLInBatches, но ее выполнение внутри вашего метода не ожидается, поэтому ReadXMLInBatches может завершиться до того, как завершатся все вызовы WriteToServerAsync.

Попробуйте следующие изменения:

public async Task ReadXMLInBatchesAsync(Func<DataTable, Task> processBatch)
{
    //...
    await processBatch(table);
    //...
}

public async Task UpdateDBWithXML(Func<Func<DataTable, Task>, Task> readXmlInBatches, string hashKey, string hash)
{
    //...
    await readXmlInBatches(async (DataTable table) =>
    //...
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...