Взаимоблокировка EF Core с несколькими потоками + BeginTransaction + Commit - PullRequest
2 голосов
/ 07 мая 2020

У меня есть вопросы о том, как работают SaveChangesAsync() и BeginTransaction() + transaction.Commit().

В моей команде есть. NET Core worker, который получает события от Microsoft EventHub и сохраняет данные на SQL сервере через EF Core 3.
Один из типов событий содержит много данных, поэтому мы создал несколько таблиц, разделил данные и затем сохранил их в этих таблицах. Дочерние таблицы ссылаются на столбец id (FK_Key) родительской таблицы.
Некоторые данные в БД должны быть удалены перед сохранением новых данных при определенных условиях, поэтому мы удаляем -> вставляем данные.

Для сохранения данных в БД мы вызываем dbContext.Database.BeginTransaction() и transaction.Commit(). Когда мы запускаем воркер, мы получаем исключение взаимоблокировки, например Transaction (Process ID 71) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

Я обнаружил, что один из .BatchDeleteAsync() в PurgeDataInChildTables() или один из BulkInsertOrUpdateAsync() в Upsert() выдает исключение взаимоблокировки (оно меняется каждый раз). раз бегаю рабочий).

Вот код:

public async Task DeleteAndUpsert(List<MyEntity> entitiesToDelete, List<MyEntity> entitiesToUpsert)
{
    if (entitiesToDelete.Any())
        await myRepository.Delete(entitiesToDelete);

    if (entitiesToUpsert.Any())
        await myRepository.Upsert(entitiesToUpsert);
}


public override async Task Upsert(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        using (var transaction = dbContext.Database.BeginTransaction())
        {
            await PurgeDataInChildTables(entities, dbContext);
            await dbContext.BulkInsertOrUpdateAsync(entities);
            // tables that depends on the parent table (FK_Key)
            await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child1>(x => x.Id).ToList());
            await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child2>(x => x.Id).ToList());
            await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child3>(x => x.Id).ToList());
            transaction.Commit();
        }
    }
}

public override async Task Delete(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        using (var transaction = dbContext.Database.BeginTransaction())
        {
            await PurgeDataInChildTables(entities, dbContext);
            await dbContext.BulkDeleteAsync(entities);
            transaction.Commit();
        }
    }
}

private async Task PurgeDataInChildTables(IList<MyEntity> entities, MyDbContext dbContext)
{
    var ids = entities.Select(x => x.Id).ToList();

    await dbContext.Child1.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
    await dbContext.Child2.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
    await dbContext.Child3.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
}

Когда рабочий запускается, он создает четыре потока, и все они загружаются в одну и ту же таблицу (и удаляются тоже). Итак, я предполагаю, что тупик возникает, когда один поток запускает транзакцию, а другой запускает другую транзакцию (или что-то подобное ..), а затем пытается выполнить вставку (или удаление из) дочерних таблиц.
Я попробовал кое-что, чтобы решить проблему и заметил, что тупик, кажется, разрешен, когда я удаляю BeginTransaction() и использую вместо него SaveChangesAsync().

Вот измененный код:

public override async Task Upsert(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        await PurgeDataInChildTables(entities, dbContext);
        await dbContext.BulkInsertOrUpdateAsync(entities);
        // tables that depends on the parent table (FK_Key)
        await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child1).ToList());
        await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child2).ToList());
        await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child3).ToList());
        await dbContext.SaveChangesAsync();
    }
}

public override async Task Delete(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        await PurgeDataInChildTables(entities, dbContext);
        await dbContext.BulkDeleteAsync(entities);
        await dbContext.SaveChangesAsync();
    }
}

Тупик произошел примерно через 30 секунд после запуска воркера, но этого не произошло в течение 2–3 минут, когда я изменил код, поэтому Я думаю, что проблема решена, подумал, что она все еще может возникнуть, если я буду запускать воркер дольше.

Наконец, вот мои вопросы:

  • Когда я использую BeginTransaction() + .Commit(), возникает тупик, но этого не происходит, когда я использую SaveChangesAsync(). Почему?
  • В чем разница между этими двумя методами с точки зрения транзакции?
  • Если измененный код все еще может вызвать тупик или не является хорошим решением, как мне решить эту проблему?

1 Ответ

1 голос
/ 08 мая 2020

Трудно сказать, не глядя на сеанс профилирования базы данных. Здесь необходимо выяснить, какие блокировки были приняты (где это shared, а где exclusive или update) и когда транзакция фактически открыта . Я опишу теоретическое поведение, которое необходимо доказать с помощью фактического профилирования базы данных.

Когда вы оборачиваете все с помощью Database.BeginTransaction () :
Уровень изоляции не устанавливается EF, он использует уровень изоляции базы данных по умолчанию. В случае Microsoft SQL Server это будет Read committed. Этот уровень изоляции говорит о том, что параллельные транзакции могут читать данные, но если происходит текущая модификация, другие транзакции будут ждать ее завершения, даже если они хотят просто прочитать. Транзакция будет удерживаться до вызова Commit().

Если вы не укажете транзакцию явно :
Операторы Select и SaveChangesAsync приведут к разделению транзакций с одинаковой изоляцией уровень по умолчанию для базы данных. Транзакция не держится дольше, чем нужно: например, в случае SaveChangesAsync, она будет там, пока записываются все изменения, начиная с момента вызова метода.

Транзакция (идентификатор процесса 71) зашла в тупик при блокировке ресурсов с другим процессом и была выбрана жертвой тупика. Повторите транзакцию.

Это сообщение появляется, когда несколько транзакций пытаются получить доступ к какому-либо ресурсу, и одна из них пытается прочитать данные, а другая пытается изменить. В этом случае, чтобы избежать мертвой блокировки, база данных попытается убить транзакцию, для отката которой потребуется меньшее количество ресурсов. В вашем случае - это транзакция, которая пытается прочитать. Чтения невелики с точки зрения отката. ресурс как база данных просто убивает транзакции других рабочих, когда они пытаются читать, вероятно, в этой точке var ids = entities.Select(x => x.Id).ToList();. Когда вы переписали свой код, вы избавились от длинных блокировок. Более того, как я вижу из документации к BulkInsertOrUpdateAsyn c, это расширение использует внутренние транзакции при каждом вызове, не влияя и не вовлекая контекст EF. Если это так, то это означает, что фактические транзакции живут даже меньше, чем один вызов SaveChangesAsync, когда данные меняются не с расширением, а обычным способом EF.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...