Многопоточное приложение C # с вызовами базы данных SQL Server - PullRequest
23 голосов
/ 31 марта 2012

У меня есть база данных SQL Server с 500 000 записей в таблице main. Есть также три другие таблицы с именами child1, child2 и child3. Отношения «многие ко многим» между child1, child2, child3 и main реализуются с помощью трех таблиц отношений: main_child1_relationship, main_child2_relationship и main_child3_relationship. Мне нужно прочитать записи в main, обновить main, а также вставить в таблицы отношений новые строки, а также вставить новые записи в дочерние таблицы. Записи в дочерних таблицах имеют ограничения уникальности, поэтому псевдокод для фактического вычисления (CalculateDetails) будет выглядеть примерно так:

for each record in main
{
   find its child1 like qualities
   for each one of its child1 qualities
   {
      find the record in child1 that matches that quality
      if found
      {
          add a record to main_child1_relationship to connect the two records
      }
      else
      {
          create a new record in child1 for the quality mentioned
          add a record to main_child1_relationship to connect the two records
      }
   }
   ...repeat the above for child2
   ...repeat the above for child3 
}

Это прекрасно работает как однопоточное приложение. Но это слишком медленно. Обработка в C # довольно трудоемка и занимает слишком много времени. Я хочу превратить это в многопоточное приложение.

Каков наилучший способ сделать это? Мы используем Linq to Sql.

До сих пор мой подход заключался в создании нового объекта DataContext для каждой партии записей из main и использовании ThreadPool.QueueUserWorkItem для его обработки. Однако эти пакеты наступают друг другу на ноги, потому что один поток добавляет запись, а затем следующий поток пытается добавить ту же самую, и ... Я получаю все виды интересных блокировок SQL Server.

Вот код:

    int skip = 0;
    List<int> thisBatch;
    Queue<List<int>> allBatches = new Queue<List<int>>();
    do
    {
        thisBatch = allIds
                .Skip(skip)
                .Take(numberOfRecordsToPullFromDBAtATime).ToList();
        allBatches.Enqueue(thisBatch);
        skip += numberOfRecordsToPullFromDBAtATime;

    } while (thisBatch.Count() > 0);

    while (allBatches.Count() > 0)
    {
        RRDataContext rrdc = new RRDataContext();

        var currentBatch = allBatches.Dequeue();
        lock (locker)  
        {
            runningTasks++;
        }
        System.Threading.ThreadPool.QueueUserWorkItem(x =>
                    ProcessBatch(currentBatch, rrdc));

        lock (locker) 
        {
            while (runningTasks > MAX_NUMBER_OF_THREADS)
            {
                 Monitor.Wait(locker);
                 UpdateGUI();
            }
        }
    }

А вот ProcessBatch:

    private static void ProcessBatch( 
        List<int> currentBatch, RRDataContext rrdc)
    {
        var topRecords = GetTopRecords(rrdc, currentBatch);
        CalculateDetails(rrdc, topRecords);
        rrdc.Dispose();

        lock (locker)
        {
            runningTasks--;
            Monitor.Pulse(locker);
        };
    }

И

    private static List<Record> GetTopRecords(RecipeRelationshipsDataContext rrdc, 
                                              List<int> thisBatch)
    {
        List<Record> topRecords;

        topRecords = rrdc.Records
                    .Where(x => thisBatch.Contains(x.Id))
                    .OrderBy(x => x.OrderByMe).ToList();
        return topRecords;
    }

CalculateDetails лучше всего объяснить псевдокодом вверху.

Я думаю, что должен быть лучший способ сделать это. Пожалуйста помоги. Большое спасибо!

Ответы [ 6 ]

48 голосов
/ 06 апреля 2012

Вот мой взгляд на проблему:

  • При использовании нескольких потоков для вставки / обновления / запроса данных в SQL Server или любой базе данных взаимоблокировки являются фактом жизни.Вы должны предположить, что они произойдут, и обращаться с ними надлежащим образом.

  • Это не значит, что мы не должны пытаться ограничивать возникновение взаимоблокировок.Тем не менее, легко прочитать об основных причинах взаимоблокировок и принять меры по их предотвращению, но SQL Server всегда вас удивит: -)

Почему-тодля взаимоблокировок:

  • Слишком много потоков - попробуйте ограничить количество потоков до минимума, но, конечно, нам нужно больше потоков для максимальной производительности.

  • Недостаточно индексов.Если выборки и обновления недостаточно избирательны, SQL будет снимать блокировки большего диапазона, чем это полезно.Попробуйте указать соответствующие индексы.

  • Слишком много индексов.Обновление индексов приводит к взаимоблокировкам, поэтому попытайтесь уменьшить индексы до необходимого минимума.

  • Слишком высокий уровень изоляции транзакции.По умолчанию уровень изоляции при использовании .NET - «Сериализуемый», тогда как по умолчанию при использовании SQL Server - «Зафиксированное чтение».Снижение уровня изоляции может очень помочь (если уместно, конечно).

Вот как я мог бы решить вашу проблему:

  • Я бы не сталЯ бы не использовал собственное решение для потоков, я бы использовал библиотеку TaskParallel.Мой основной метод выглядел бы примерно так:

    using (var dc = new TestDataContext())
    {
        // Get all the ids of interest.
        // I assume you mark successfully updated rows in some way
        // in the update transaction.
        List<int> ids = dc.TestItems.Where(...).Select(item => item.Id).ToList();
    
        var problematicIds = new List<ErrorType>();
    
        // Either allow the TaskParallel library to select what it considers
        // as the optimum degree of parallelism by omitting the 
        // ParallelOptions parameter, or specify what you want.
        Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8},
                            id => CalculateDetails(id, problematicIds));
    }
    
  • Выполните метод CalculateDetails с повторными попытками при возникновении тупиковых ситуаций

    private static void CalculateDetails(int id, List<ErrorType> problematicIds)
    {
        try
        {
            // Handle deadlocks
            DeadlockRetryHelper.Execute(() => CalculateDetails(id));
        }
        catch (Exception e)
        {
            // Too many deadlock retries (or other exception). 
            // Record so we can diagnose problem or retry later
            problematicIds.Add(new ErrorType(id, e));
        }
    }
    
  • ЯдроМетод CalculateDetails

    private static void CalculateDetails(int id)
    {
        // Creating a new DeviceContext is not expensive.
        // No need to create outside of this method.
        using (var dc = new TestDataContext())
        {
            // TODO: adjust IsolationLevel to minimize deadlocks
            // If you don't need to change the isolation level 
            // then you can remove the TransactionScope altogether
            using (var scope = new TransactionScope(
                TransactionScopeOption.Required,
                new TransactionOptions {IsolationLevel = IsolationLevel.Serializable}))
            {
                TestItem item = dc.TestItems.Single(i => i.Id == id);
    
                // work done here
    
                dc.SubmitChanges();
                scope.Complete();
            }
        }
    }
    
  • И, конечно, моя реализация помощника повтора тупиковой попытки

    public static class DeadlockRetryHelper
    {
        private const int MaxRetries = 4;
        private const int SqlDeadlock = 1205;
    
        public static void Execute(Action action, int maxRetries = MaxRetries)
        {
            if (HasAmbientTransaction())
            {
                // Deadlock blows out containing transaction
                // so no point retrying if already in tx.
                action();
            }
    
            int retries = 0;
    
            while (retries < maxRetries)
            {
                try
                {
                    action();
                    return;
                }
                catch (Exception e)
                {
                    if (IsSqlDeadlock(e))
                    {
                        retries++;
                        // Delay subsequent retries - not sure if this helps or not
                        Thread.Sleep(100 * retries);
                    }
                    else
                    {
                        throw;
                    }
                }
            }
    
            action();
        }
    
        private static bool HasAmbientTransaction()
        {
            return Transaction.Current != null;
        }
    
        private static bool IsSqlDeadlock(Exception exception)
        {
            if (exception == null)
            {
                return false;
            }
    
            var sqlException = exception as SqlException;
    
            if (sqlException != null && sqlException.Number == SqlDeadlock)
            {
                return true;
            }
    
            if (exception.InnerException != null)
            {
                return IsSqlDeadlock(exception.InnerException);
            }
    
            return false;
        }
    }
    
  • Еще одна возможность - использовать разбиениестратегия

Если ваши таблицы могут быть естественным образом разбиты на несколько отдельных наборов данных, то вы можете использовать SQL Server секционированные таблицы и индексы , или вы можете вручную разделите ваши существующие таблицы на несколько наборов таблиц.Я бы порекомендовал использовать разбиение SQL Server, так как второй вариант был бы грязным.Также встроенное разбиение доступно только в SQL Enterprise Edition.

Если для вас возможно разбиение, вы можете выбрать схему разбиения, которая разбила ваши данные, скажем, на 8 различных наборов.Теперь вы можете использовать свой оригинальный однопоточный код, но иметь 8 потоков, каждый из которых предназначен для отдельного раздела.Теперь не будет никаких (или, по крайней мере, минимального количества) взаимоблокировок.

Надеюсь, это имеет смысл.

5 голосов
/ 05 апреля 2012

Обзор

Корень вашей проблемы в том, что L2S DataContext, как и ObjectContext Entity Framework, не является поточно-ориентированным. Как объясняется в на этом форуме MSDN , поддержка асинхронных операций в решениях .NET ORM все еще ожидается, начиная с .NET 4.0; вам придется развернуть свое собственное решение, которое, как вы обнаружили, не всегда легко сделать, если ваша инфраструктура предполагает однопоточность.

Я воспользуюсь этой возможностью, чтобы отметить, что L2S построен поверх ADO.NET, который сам полностью поддерживает асинхронную работу - лично я бы предпочел иметь дело непосредственно с этим нижним уровнем и писать сам SQL, просто чтобы убедитесь, что я полностью понял, что происходит по сети.

SQL Server Solution?

При этом я должен спросить - должно ли это быть решение на C #? Если вы можете составить свое решение из набора операторов вставки / обновления, вы можете просто отправить SQL напрямую, и ваши проблемы с потоками и производительностью исчезнут. * Мне кажется, что ваши проблемы связаны не с реальными преобразованиями данных, сделано, но сосредоточиться вокруг того, чтобы сделать их производительными из .NET. Если .NET удаляется из уравнения, ваша задача становится проще. В конце концов, лучшим решением часто является то, что вы пишете наименьшее количество кода, верно? ;)

Даже если ваша логика обновления / вставки не может быть выражена строго в реляционной форме, в SQL Server есть встроенный механизм для перебора записей и выполнения логики - хотя они справедливо порочны для многих случаев использования, курсоры могут фактически подходить для вашей задачи.

Если это задача, которая должна повторяться неоднократно, вы могли бы извлечь большую пользу из ее кодирования как хранимой процедуры.

* Конечно, длительно работающий SQL приносит свои собственные проблемы, такие как эскалация блокировки и использование индекса, с которыми вам придется бороться.

Решение C #

Конечно, может случиться так, что об этом в SQL не может быть и речи - может быть, решения вашего кода зависят, например, от данных, поступающих из других мест, или, возможно, ваш проект имеет строгое соглашение, запрещающее использование SQL , Вы упоминаете некоторые типичные ошибки многопоточности, но, не видя ваш код, я не могу помочь с ними конкретно.

Делать это из C #, очевидно, целесообразно, но вам нужно учитывать тот факт, что для каждого звонка, который вы делаете, будет существовать фиксированная величина задержки. Вы можете уменьшить влияние задержки в сети, используя пулы соединений, активируя несколько активных наборов результатов и используя асинхронные методы Begin / End для выполнения ваших запросов. Даже несмотря на все это, вам все равно придется признать, что отправка данных из SQL Server в ваше приложение обходится дорого.

Один из лучших способов не допустить, чтобы ваш код перешагивал через себя, - это избегать максимально возможного разделения изменяемых данных между потоками. Это будет означать, что нельзя использовать один и тот же DataContext в нескольких потоках. Следующим лучшим подходом является блокировка критических участков кода, которые касаются общих данных - lock блокирует весь доступ к DataContext, от первого чтения до окончательной записи. Такой подход может полностью устранить преимущества многопоточности; Скорее всего, вы можете сделать свой замок более мелкозернистым, но будьте осторожны, это путь боли.

Гораздо лучше полностью отделить свои операции друг от друга. Если вы можете разделить свою логику между «основными» записями, это идеально, то есть, если нет связей между различными дочерними таблицами и если одна запись в «основной» не имеет последствий для Во-вторых, вы можете разделить свои операции на несколько потоков следующим образом:

private IList<int> GetMainIds()
{
    using (var context = new MyDataContext())
        return context.Main.Select(m => m.Id).ToList();
}

private void FixUpSingleRecord(int mainRecordId)
{
    using (var localContext = new MyDataContext())
    {
        var main = localContext.Main.FirstOrDefault(m => m.Id == mainRecordId);

        if (main == null)
            return;

        foreach (var childOneQuality in main.ChildOneQualities)
        {
            // If child one is not found, create it
            // Create the relationship if needed
        }

        // Repeat for ChildTwo and ChildThree

        localContext.SaveChanges();
    }
}

public void FixUpMain()
{
    var ids = GetMainIds();
    foreach (var id in ids)
    {
        var localId = id; // Avoid closing over an iteration member
        ThreadPool.QueueUserWorkItem(delegate { FixUpSingleRecord(id) });
    }
}

Очевидно, что это такой же игрушечный пример, как и псевдокод в вашем вопросе, но, надеюсь, он заставляет задуматься о том, как распределить свои задачи так, чтобы между ними не было (или было бы минимальное) общее состояние.Это, я думаю, будет ключом к правильному решению на C #.

РЕДАКТИРОВАТЬ Отвечая на обновления и комментарии

Если вы столкнулись с проблемами согласованности данных, я бы посоветовал применять семантику транзакций -Вы можете сделать это с помощью System.Transactions.TransactionScope (добавьте ссылку на System.Transactions).С другой стороны, вы можете сделать это на уровне ADO.NET, получив доступ к внутреннему соединению и вызвав на нем BeginTransaction (или как бы не вызывался метод DataConnection).

Вы также упоминаете взаимоблокировки.То, что вы боретесь с взаимоблокировками SQL Server, указывает на то, что реальные запросы SQL наступают друг другу на ноги.Не зная, что на самом деле отправляется по телеграфу, сложно сказать подробно, что происходит и как это исправить.Достаточно сказать, что взаимоблокировки SQL возникают в результате SQL-запросов, а не обязательно из потоковых конструкций C # - вам необходимо проверить, что именно происходит по проводам.Моя интуиция говорит мне, что если каждая «основная» запись действительно независима от других, тогда не должно быть необходимости в блокировке строк и таблиц, и что Linq to SQL, вероятно, является здесь виновником.

Выможет получить дамп необработанного SQL, генерируемого L2S в вашем коде, установив для свойства DataContext.Log что-то, например, Console.Out.Хотя я никогда не использовал его лично, я понимаю, что LINQPad предлагает средства L2S, и вы, возможно, также сможете использовать SQL там.

SQL Server Management Studio предоставит вам остальную часть пути -используя Activity Monitor, вы можете следить за эскалацией блокировки в реальном времени.С помощью Query Analyzer вы можете получить представление о том, как SQL Server будет выполнять ваши запросы.С их помощью вы сможете получить четкое представление о том, что делает ваш код на стороне сервера, и, в свою очередь, о том, как его исправить.

2 голосов
/ 06 апреля 2012

Я бы рекомендовал перенести всю обработку XML на сервер SQL.Мало того, что все ваши тупики исчезнут, но вы увидите такое повышение производительности, что вам никогда не захочется возвращаться.

Это будет лучше всего объяснено на примере.В этом примере я предполагаю, что блоб XML уже входит в вашу основную таблицу (я называю это шкафом).Я приму следующую схему:

CREATE TABLE closet (id int PRIMARY KEY, xmldoc ntext) 
CREATE TABLE shoe(id int PRIMARY KEY IDENTITY, color nvarchar(20))
CREATE TABLE closet_shoe_relationship (
    closet_id int REFERENCES closet(id),
    shoe_id int REFERENCES shoe(id)
)

И я ожидаю, что ваши данные (только основная таблица) изначально будут выглядеть так:

INSERT INTO closet(id, xmldoc) VALUES (1, '<ROOT><shoe><color>blue</color></shoe></ROOT>')
INSERT INTO closet(id, xmldoc) VALUES (2, '<ROOT><shoe><color>red</color></shoe></ROOT>')

Тогда вся ваша задача так же проста, какследующее:

INSERT INTO shoe(color) SELECT DISTINCT CAST(CAST(xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) AS color from closet
INSERT INTO closet_shoe_relationship(closet_id, shoe_id) SELECT closet.id, shoe.id FROM shoe JOIN closet ON CAST(CAST(closet.xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) = shoe.color

Но, учитывая, что вы будете выполнять много подобной обработки, вы можете упростить свою жизнь, объявив свой основной BLOB-объект как тип XML и еще более упростив его до этого:

INSERT INTO shoe(color)
    SELECT DISTINCT CAST(xmldoc.query('//shoe/color/text()') AS nvarchar)
    FROM closet
INSERT INTO closet_shoe_relationship(closet_id, shoe_id)
    SELECT closet.id, shoe.id
    FROM shoe JOIN closet
        ON CAST(xmldoc.query('//shoe/color/text()') AS nvarchar) = shoe.color

Возможна дополнительная оптимизация производительности, например, предварительное вычисление многократно вызванных результатов Xpath во временной или постоянной таблице или преобразование начальной совокупности основной таблицы в BULK INSERT, но я не ожидаю, что вам действительно понадобитсяте, чтобы преуспеть.

1 голос
/ 05 апреля 2012

sql блокировки сервера нормальны и ожидаемы в сценарии этого типа - MS рекомендует, чтобы они обрабатывались на стороне приложения , а не на стороне дБ.вам нужно убедиться, что хранимая процедура вызывается только один раз, тогда вы можете использовать блокировку мьютекса sql с помощью sp_getapplock.Вот пример того, как реализовать это

BEGIN TRAN
DECLARE @mutex_result int;
EXEC @mutex_result = sp_getapplock @Resource = 'CheckSetFileTransferLock',
 @LockMode = 'Exclusive';

IF ( @mutex_result < 0)
BEGIN
    ROLLBACK TRAN

END

-- do some stuff

EXEC @mutex_result = sp_releaseapplock @Resource = 'CheckSetFileTransferLock'
COMMIT TRAN  
0 голосов
/ 10 апреля 2012

Если

  • У вас мало времени, чтобы потратить на эту проблему, и вам нужно исправить это прямо сейчас
  • Вы уверены, что ваш код сделан такэтот другой поток НЕ будет изменять одну и ту же запись
  • Вы не боитесь

Тогда ... вы можете просто добавить "БЕЗ БЛОКИРОВКИ" в ваши запросы, чтобы MSSQL непримените блокировки.

Чтобы использовать с осторожностью:)

Но в любом случае, вы не сказали нам, где теряется время (в однопоточной версии).Потому что, если это в коде, я посоветую вам написать все в БД напрямую, чтобы избежать непрерывного обмена данными.Если он находится в БД, я посоветую проверить индекс (слишком много?), Ввод / вывод, процессор и т. Д.

0 голосов
/ 06 апреля 2012

Это может быть очевидным, но циклический просмотр каждого кортежа и выполнение вашей работы в контейнере сервлета требует больших затрат на каждую запись.

Если возможно, перенесите часть или всю эту обработку на сервер SQL, переписав свою логику как одну или несколько хранимых процедур.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...