Я создал службу обработки файлов, которая считывает и импортирует XML-файлы из определенного каталога.
Служба запускает несколько рабочих, которые опрашивают очередь файлов для новых файлов и используют linq2sql для доступа к данным.Каждый рабочий поток имеет свой собственный текстовый текст.
Обрабатываемые файлы содержат несколько заказов, и каждый заказ содержит несколько адресов (Заказчик / Подрядчик / Субподрядчик)
Я определил область транзакций для обработки каждого из них.файл.Таким образом, я хочу убедиться, что весь файл обрабатывается правильно, или что весь файл откатывается при возникновении исключения:
try
{
using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew))
{
foreach (var order in orders)
{
HandleType1Order(order);
}
tx.Complete();
}
}
catch (SqlException ex)
{
if (ex.Number == SqlErrorNumbers.Deadlock)
{
throw new FileHandlerException("File Caused a Deadlock, retrying later", ex, true);
}
else
throw;
}
Одним из требований к службе является создание или обнаружение обновлений.адреса в XML-файлах.Поэтому я создал адресную службу, которая отвечает за управление адресами.Следующий фрагмент кода выполняется для каждого заказа (в методе HandleType1Order()
) в файле импорта xml () и, таким образом, является частью TransactionScope для всего файла ).
using (var tx = new TransactionScope())
{
address = GetAddressByReference(number);
if (address != null) //address is already known
{
Log.Debug("Found address {0} - {1}. Updating...", address.Code, address.Name);
UpdateAddress(address, name, number, isContractor, isSubContractor, isCustomer);
}
else
{
//address not known, so create it
Log.Debug("Address {0} not known, creating address", number);
address = CreateAddress(name, number, sourceSystemId, isContractor, isSubContractor,
isCustomer);
_addressRepository.Save(address);
}
_addressRepository.Flush();
tx.Complete();
}
То, что я пытаюсь сделать здесь, - это создать или обновить адрес с уникальным номером.
Метод GetAddressByReference(string number)
возвращает известный адрес или ноль, если адрес не найден.
public virtual Address GetAddressByReference(string reference)
{
return _addressRepository.GetAll().SingleOrDefault(a=>a.Code==reference);
}
Когда я запускаю сервис, он создает несколько адресов с одинаковым номером.Метод GetAddressByReference()
get вызывается одновременно и должен возвращать известный адрес, когда второй поток выполняет метод с тем же адресным номером, однако он возвращает ноль.Возможно, что-то не так с моими границами транзакций или уровнем изоляции, но я не могу заставить его работать.
Может ли кто-нибудь указать мне правильное направление?Помощь очень ценится !!
ps У меня нет проблем с блокировкой транзакций и возникновением отката, файл будет просто повторен при возникновении взаимоблокировки.
Edit 1 Код потока:
public void Work()
{
_isRunning = true;
while (true)
{
ImportFileTask task = _queue.Dequeue(); //dequeue blocks on empty queue
if (task == null)
break; //Shutdown worker when a null task is read from the queue
IFileImporter importer = null;
try
{
using (new LockFile(task.FilePath).Acquire()) //create a filelock to sync access accross all processes to the file
{
importer = _kernel.Resolve<IFileImporter>();
Log.DebugFormat("Processing file {0}", task.FilePath);
importer.Import(task.FilePath);
Log.DebugFormat("Done Processing file {0}", task.FilePath);
}
}
catch(Exception ex)
{
Log.Fatal(
"A Fatal exception occured while handling {0} --> {1}".FormatWith(task.FilePath, ex.Message), ex);
}
finally
{
if (importer != null)
_kernel.ReleaseComponent(importer);
}
}
_isRunning = false;
}
Вышеприведенный метод работает во всех наших рабочих потоках.Он использует Castle Windsor для разрешения FileImporter, который имеет временный образ жизни (таким образом, не разделяется между потоками).