Как распределенные транзакции ведут себя с несколькими подключениями к одной и той же БД в многопоточной среде? - PullRequest
6 голосов
/ 02 февраля 2010

Я пытаюсь определить поведение соединения нескольких баз данных в распределенной транзакции.

У меня есть длительный процесс, который порождает серию потоков, и каждый поток затем отвечает за управление своими подключениями к БД и тому подобное. Все это выполняется внутри области транзакции, и каждый поток зачисляется в транзакцию через объект DependentTransaction.

Когда я пошел, чтобы поставить этот процесс параллельно, я столкнулся с несколькими проблемами, а именно с тем, что, похоже, существует какой-то блок, препятствующий одновременному выполнению запросов в транзакции.

Что мне хотелось бы знать, так это то, как координатор транзакций обрабатывает запросы от нескольких соединений к одной и той же БД, и даже если желательно передавать объект соединения между потоками?

Я читал, что MS SQL допускает только одно соединение на транзакцию, но я ясно могу создать и инициализировать более одного соединения с одной и той же БД в одной транзакции. Я просто не могу выполнять потоки параллельно, не получая исключение «контекст транзакции используется другим сеансом» при открытии соединений. В результате соединения должны ждать выполнения, а не работать в одно и то же время, и в конце код выполняется до завершения, но нет чистой выгоды для многопоточности приложения из-за этой проблемы блокировки.

Код выглядит примерно так.

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

EDIT

Мои тесты убедительно показали, что это просто невозможно сделать. Даже если используется более одного соединения или используется одно и то же соединение, все запросы в транзакции или вопросы обрабатываются последовательно.

Возможно, они изменят это поведение в будущем.

1 Ответ

9 голосов
/ 02 февраля 2010

Во-первых, вы должны разделить то, что читали здесь и там о транзакциях SQL Server, на 2 разных случая: локальный и распределенный.

Локальные транзакции SQL :

  • SQL Server позволяет выполнять только один запрос для каждой локальной транзакции.
  • По умолчанию только один сеанс может быть зарегистрирован в локальной транзакции.Используя sp_getbindtoken и sp_bindsession, можно зарегистрировать несколько сеансов в локальной транзакции.Сеансы по-прежнему ограничены только одним выполнением запроса в любое время.
  • При использовании нескольких активных наборов результатов (MARS) один сеанс может выполнять несколько запросов.Все запросы должны быть зарегистрированы в одной локальной транзакции.

Распределенные транзакции :

  • Для нескольких сеансов локальная транзакция может быть зарегистрирована в одной транзакциираспределенная транзакция.
  • Каждый сеанс по-прежнему зарегистрирован в локальной транзакции с учетом всех ограничений, указанных выше для локальных транзакций
  • Локальные транзакции, зарегистрированные в распределенной транзакции, подлежат двухфазной фиксации, координируемой распределенной транзакцией
  • Все локальные транзакции в экземпляре, зарегистрированном в распределенной транзакции, по-прежнему независимы локальные транзакции, в основном это означает, что они имеют конфликтующие пространства имен блокировки.

Таким образом, когда клиент создает.Net TransactionScope и в рамках этой области транзакции выполняет несколько запросов на одном сервере, все эти запросы являются локальными транзакциями, зарегистрированными в распределенной транзакции.Простой пример:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

Создайте фиктивную тестовую таблицу:

create table test (id int not null identity(1,1) primary key, a varchar(100));

и запустите код в моем примере.Вы увидите, что оба запроса выполняются параллельно, каждый из которых извлекает 100 тыс. Строк в таблице, а затем оба фиксируют, когда завершается область транзакции.Таким образом, проблемы, с которыми вы сталкиваетесь, не связаны ни с SQL Server, ни с TransactionScope, они могут легко справиться с описанным вами сценарием.Более того, код очень прост и понятен, и нет никакой необходимости в создании зависимых транзакций, клонировании или в продвижении транзакций.

Обновлено

Использование явных потоков и зависимых транзакций:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }
...