Многопоточные приложения linq2sql Трудности TransactionScope - PullRequest
0 голосов
/ 22 апреля 2011

Я создал службу обработки файлов, которая считывает и импортирует XML-файлы из определенного каталога.

Служба запускает несколько рабочих, которые опрашивают очередь файлов для новых файлов и используют linq2sql для доступа к данным.Каждый рабочий поток имеет свой собственный текстовый текст.

Обрабатываемые файлы содержат несколько заказов, и каждый заказ содержит несколько адресов (Заказчик / Подрядчик / Субподрядчик)

Я определил область транзакций для обработки каждого из них.файл.Таким образом, я хочу убедиться, что весь файл обрабатывается правильно, или что весь файл откатывается при возникновении исключения:

        try
        {
            using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                foreach (var order in orders)
                {
                    HandleType1Order(order);
                }
                tx.Complete();
            }
        }
        catch (SqlException ex)
        {
            if (ex.Number == SqlErrorNumbers.Deadlock)
            {
                throw new FileHandlerException("File Caused a Deadlock, retrying later", ex, true);
            }
            else
                throw;
        }

Одним из требований к службе является создание или обнаружение обновлений.адреса в XML-файлах.Поэтому я создал адресную службу, которая отвечает за управление адресами.Следующий фрагмент кода выполняется для каждого заказа (в методе HandleType1Order()) в файле импорта xml () и, таким образом, является частью TransactionScope для всего файла ).

 using (var tx = new TransactionScope())
            {

                address = GetAddressByReference(number);
                if (address != null) //address is already known
                {
                    Log.Debug("Found address {0} - {1}. Updating...", address.Code, address.Name);
                    UpdateAddress(address, name, number, isContractor, isSubContractor, isCustomer);
                }
                else
                {
                    //address not known, so create it
                    Log.Debug("Address {0} not known, creating address", number);
                    address = CreateAddress(name, number, sourceSystemId, isContractor, isSubContractor,
                                            isCustomer);
                    _addressRepository.Save(address);

                }

                _addressRepository.Flush();
                tx.Complete();
            }

То, что я пытаюсь сделать здесь, - это создать или обновить адрес с уникальным номером.

Метод GetAddressByReference(string number) возвращает известный адрес или ноль, если адрес не найден.

 public virtual Address GetAddressByReference(string reference)
 {
     return _addressRepository.GetAll().SingleOrDefault(a=>a.Code==reference);
 }

Когда я запускаю сервис, он создает несколько адресов с одинаковым номером.Метод GetAddressByReference() get вызывается одновременно и должен возвращать известный адрес, когда второй поток выполняет метод с тем же адресным номером, однако он возвращает ноль.Возможно, что-то не так с моими границами транзакций или уровнем изоляции, но я не могу заставить его работать.

Может ли кто-нибудь указать мне правильное направление?Помощь очень ценится !!

ps У меня нет проблем с блокировкой транзакций и возникновением отката, файл будет просто повторен при возникновении взаимоблокировки.


Edit 1 Код потока:

        public void Work()
    {
        _isRunning = true;
        while (true)
        {
            ImportFileTask task = _queue.Dequeue(); //dequeue blocks on empty queue               
            if (task == null)
                break; //Shutdown worker when a null task is read from the queue

            IFileImporter importer = null;
            try
            {
                using (new LockFile(task.FilePath).Acquire()) //create a filelock to sync access accross all processes to the file
                {
                    importer = _kernel.Resolve<IFileImporter>();
                    Log.DebugFormat("Processing file {0}", task.FilePath);
                    importer.Import(task.FilePath);
                    Log.DebugFormat("Done Processing file {0}", task.FilePath);
                }
            }
            catch(Exception ex)
            {
                Log.Fatal(
                    "A Fatal exception occured while handling {0} --> {1}".FormatWith(task.FilePath, ex.Message), ex);
            }
            finally
            {
                if (importer != null)
                    _kernel.ReleaseComponent(importer);
            }

        }

        _isRunning = false;
    }

Вышеприведенный метод работает во всех наших рабочих потоках.Он использует Castle Windsor для разрешения FileImporter, который имеет временный образ жизни (таким образом, не разделяется между потоками).

1 Ответ

0 голосов
/ 24 апреля 2011

Вы не опубликовали свой код потока, поэтому трудно сказать, в чем проблема. Я предполагаю, что вы запустили DTC (координатор распределенных транзакций)?

Вы используете ThreadPool? Вы используете ключевое слово "lock"?

http://msdn.microsoft.com/en-us/library/c5kehkcz.aspx

...