Прочитать файл определенного типа с помощью одного потока - PullRequest
0 голосов
/ 13 июня 2019

У меня есть папка с несколькими файлами, я намереваюсь прочитать эти файлы и вставить в их таблицы с указанием типа.

Проблема:

Я читаю файлы с несколькими потоками, нокаждый раз, когда поток пытается прочитать тип (строку) файла, который читается другим потоком, происходит сбой с ошибкой:

Транзакция (ID процесса 69) блокируется при блокировке |ресурсы буфера связи с другим процессом и были выбраны в качестве жертвы тупика.Перезапустите транзакцию.в System.Data.SqlClient.SqlConnection.OnError (исключение SqlException, логическое breakConnection, завершение действия 1 wrapCloseInAction) at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose) at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady) at System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String methodName, Boolean async, Int32 timeout, Boolean asyncWrite) at System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(TaskCompletionSource 1, строковое methodName, логическое sendToPipe, тайм-аут Int32, логическое и usedCache, логическое asyncWrite, логический inRetry.lc..ExecuteNonQuery () в Microsoft.Practices.EnterpriseLibrary.Data.Database.DoExecuteNonQuery (команда DbCommand) в Microsoft.Practices.EnterpriseLibrary.Data.Database.ExecuteNonQuery (команда DbCommand)

* 1011 я мог бы сделать с этимодин поток, но это занимает много времени, с многопоточностью, Как я мог избежать сбоя?

Код:

 public int InsertData(string data, string tableName)
        {
            string query;
            var token = JToken.Parse(data)["data"];
            JArray jArrayData;
            if (token is JArray)
            {
                jArrayData = token as JArray;// JArray.Parse(data) as JArray;
                foreach (var item in jArrayData.Children())
                {
                    var itemProperties = item.Children<JProperty>();
                    query = ServerHelper.CreateInsertQuery(itemProperties, tableName);
                    ExecuteQuery(query);
                }
            }

            return 0;
        }

   private int ExecuteQuery(string query)
        {
            if(conn.State == ConnectionState.Closed)
            {
                conn.Open();
            }
            SqlCommand cmd = new SqlCommand(query, conn);
            return cmd.ExecuteNonQuery();
        }


 public static string CreateInsertQuery(JEnumerable<JProperty> itemProperties, string tableName)
        {
            string[] nameArray = itemProperties.Select(x => x.Name).ToArray();
            string[] valuesArray = itemProperties.Select(x => "'" + x.Value.ToString().Replace("'","''") + "'").ToArray();
            //check for null
              string query = "Insert Into " + tableName + " (" + string.Join(",", nameArray) + @")"
         + " values(" + string.Join(",", valuesArray) + ")";
        return query;
        }

Код, который пытается выполнять запросы параллельно:

   public void PerformInitialization()
        {
            ts = new CancellationTokenSource();
            ct = ts.Token;

            ChangeProcToDat();

            string strMaxThreads = ConfigurationManager.AppSettings["MaxThreads"].ToString();
            if(!string.IsNullOrEmpty(strMaxThreads))
            {
                bool parseSuccess = Int32.TryParse(strMaxThreads, out maxThreads);
                if(parseSuccess == false)
                {
                    maxThreads = 1;
                }
            }

            processQueueTasks = new Task[maxThreads];

            processData = new ProcessData();

            for (int i = 0; i < maxThreads; i++)
            {
                processQueueTasks[i] = Task.Factory.StartNew(() => ProcessQueue(), ct);
            }
        }

 public void ProcessQueue()
    {
        bool cancelled = false;
        try
        {

            while (!ct.IsCancellationRequested)
            {
                try
                {

                    Directory.CreateDirectory(queueFolderPath);

                    DirectoryInfo queueFolderDi = new DirectoryInfo(queueFolderPath);

                    FileInfo datFile = queueFolderDi.GetFiles("*.dat", SearchOption.AllDirectories).OrderBy(f => f.LastWriteTime).ToList().FirstOrDefault();
                    while (datFile != null)
                    {
                        string processingFileName = datFile.FullName.Replace(".dat", ".proc");
                        FileInfo file = new FileInfo(processingFileName);
                        File.Move(datFile.FullName, processingFileName);

                        try
                        {                                

                            if (file.Name.Contains("_ABC_"))
                            {
                                var fileNameArr = file.Name.Split('_', '.');

                                var fileType = "";  # this is the type of the file
                                if (fileNameArr.Length >= 3)
                                {
                                    fileType = fileNameArr[3];
                                }

                                // the db part

                               var content = File.ReadAllText(file.FullName);

                                int responseCode = 0;
                                InsertData(content, reportType);

                                File.Delete(file.FullName);


                            }

                        }
                        catch (Exception ex)
                        {
                            Log.WriteToFailed("Error processing queue file " + file.Name + " "
                                + Environment.NewLine + ex.Message + Environment.NewLine + ex.StackTrace + Environment.NewLine);

                        }


                        datFile = queueFolderDi.GetFiles("*.dat", SearchOption.AllDirectories).OrderBy(f => f.LastWriteTime).ToList().FirstOrDefault();


                    }


                }
                catch (Exception ex)
                {
                    Log.WriteToFailed("Error processing files in the queue folder."
                                + Environment.NewLine + ex.Message + Environment.NewLine + ex.StackTrace + Environment.NewLine);

                }
                finally
                {
                    cancelled = ct.WaitHandle.WaitOne(1000);
                }


            }
        }
        catch (Exception ex)
        {
            Log.WriteToFailed("Error occured in send thread. Message:" + ex.Message + "\n Stack trace:" + ex.StackTrace);
        }
    }

1 Ответ

0 голосов
/ 13 июня 2019

Похоже, фактический вопрос в том, как быстро вставить записи из большого количества файлов JSON в таблицу SQL Server.

Производительность и взаимоблокировки

Попытка решения медленная, поскольку она пытается вставить записи Row By Agonizing Row (RBAR).Это самый медленный способ сделать это.

Он блокируется, потому что он использует долгоживущие соединения, что означает, что блокировки строк и таблиц накапливаются и удерживаются, пока соединение остается открытым.Если таблица не имеет первичного ключа или некластеризованного индекса, SQL Server, возможно, придется взять больше блокировок на странице данных или даже на уровне таблицы и сохранять их до тех пор, пока соединение остается открытым.

Использование многопоточности в этомдело может усугубить ситуацию, так как несколько соединений ждут друг друга.Если отсутствующие индексы вызывают чрезмерную блокировку, соединения могут в конечном итоге блокировать страницы данных, необходимые друг другу, и, следовательно, взаимоблокировать.

Существует множество способов вставки данных JSON в SQL Server

JSON в SQL Server 2016 и более поздних версиях .

SQL Server 2016 и имеют функции JSON, которые можно использовать для анализа строк JSON.Можно написать хранимую процедуру, которая принимает строку JSON в качестве параметра, анализирует ее с помощью OPENJSON и вставляет значения в таблицу.Кража примера из OPENJSON - Самый простой способ импортировать текст JSON в таблицу :

 INSERT INTO Person
 SELECT * 
 FROM OPENJSON(@json)
 WITH (id int,
       firstName nvarchar(50), lastName nvarchar(50), 
       isAlive bit, age int,
       dateOfBirth datetime2, spouse nvarchar(50))

При отправке всего содержимого файла на SQL Server не будет использоваться more данных, чем отправка отдельных текстовых значений.

OPENROWSET

Другой вариант - загрузить файлы JSON напрямую с помощью OPENROWSET и проанализировать его с помощью OPENJSON, например:

INSERT INTO TargetTable(ID,Name,Price,Pages,Author)
SELECT book.id, book.name, book.price, book.pages_i, book.author
 FROM OPENROWSET (BULK 'C:\JSON\Books\books.json', SINGLE_CLOB) as j
 CROSS APPLY OPENJSON(BulkColumn)
 WITH( id nvarchar(100), name nvarchar(100), price float,
 pages_i int, author nvarchar(100)) AS book

Разбор на стороне клиента и SqlBulkCopy

Другой вариант - это анализ текста на стороне клиента, например, с помощью JSON.NET, и массовая вставка данных с использованием SqlBulkCopy.SqlBulkCopy использует те же механизмы, которые использовались BULK INSERT или bcp для максимально быстрой вставки данных с минимальным ведением журнала.

FastMember's ObjectReader можно использовать для преобразования списка элементов вIDataReader ожидается SqlBulkCopy.Код может быть примерно таким:

var json=File.ReadAllText(path);
var data=JsonConvert.DeserializeObject<List<SomeType>>(json);
using(var bcp = new SqlBulkCopy(connection)) 
using(var reader = ObjectReader.Create(data, "Id", "Name", "Description")) 
{ 
  bcp.DestinationTableName = "SomeTable"; 
  bcp.WriteToServer(reader); 
}

Или, как метод:

void ImportJson<T>(string path,string tableName,string[] fields)
{
    var json=File.ReadAllText(path);
    var data=JsonConvert.DeserializeObject<List<T>>(json);
    using(var bcp = new SqlBulkCopy(connection)) 
    using(var reader = ObjectReader.Create(data, fields)) 
    { 
        bcp.DestinationTableName = tableName; 
        bcp.WriteToServer(reader); 
    }
}

Разбор на стороне клиента и многострочные INSERT

Вместо того, чтобы выполнять отдельные операторы INSERT, производительность может быть улучшена путем их пакетирования или использования многострочных операторов INSERT, например:

INSERT INTO TABLE TableA (Col1,Col2)
VALUES
(Val11,Val12),
(Val21,Val22)

Хотя писать это вручную скучно.Можно использовать другое творение Марка Гравелла, Dapper , чтобы выполнить несколько ВСТАВК:

var json=File.ReadAllText(path);
var data=JsonConvert.DeserializeObject<List<T>>(json);
conn.Execute("INSERT INTO TableA (Col1,Col2) VALUES(@Prop1,@Prop2)",data);

Есть и другие варианты.Данные могут быть отправлены в виде табличного параметра, еще одну вещь, которую проще сделать с Dapper

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...