Одновременное чтение / запись MySQL EF Core - PullRequest
0 голосов
/ 21 марта 2020

Использование EF Core 2.2.6 и Pomelo.EntityFrameworkCore. MySql 2.2.6 (с MySqlConnector 0.59.2)). У меня есть модель для UserData:

public class UserData
{
    [DatabaseGenerated(DatabaseGeneratedOption.None)]
    public ulong ID { get; private set; }

    [Required]
    public Dictionary<string, InventoryItem> Inventory { get; set; }

    public UserData()
    {
        Data = new Dictionary<string, string>();
    }
}

У меня есть метод REST, который можно вызвать, чтобы добавить элементы в инвентарь пользователя:

using (var transaction = context.Database.BeginTransaction())
{
    UserData data = await context.UserData.FindAsync(userId);

    // there is code here to detect duplicate entries/etc, but I've removed it for brevity
    foreach (var item in items) data.Inventory.Add(item.ItemId, item);

    context.UserData.Update(data);
    await context.SaveChangesAsync();

    transaction.Commit();
}

Если два или более вызовов этого метод сделан с тем же идентификатором пользователя, тогда я получаю параллельный доступ (несмотря на транзакцию). Это приводит к тому, что данные иногда неверны. Например, если инвентарь пуст, а затем выполняется два вызова для одновременного добавления элементов (элемент A и элемент B), иногда база данных будет содержать только A или B, а не оба. Из журналирования видно, что EF может читать из базы данных, в то время как другие операции чтения / записи все еще происходят, в результате чего код имеет неправильное состояние инвентаря, когда он пытается выполнить обратную запись в базу данных. Поэтому я попытался пометить уровень изоляции как сериализуемый.

using (var transaction = context.Database.BeginTransaction(System.Data.IsolationLevel.Serializable))

Теперь я иногда вижу исключение:

MySql.Data.MySqlClient.MySqlException (0x80004005): Deadlock found when trying to get lock; try restarting transaction

Я не понимаю, как этот код мог зайти в тупик ... В любом случае, Я попытался продолжить, завернув все это в попытку / перехват и повторив попытку:

public static async Task<ResponseError> AddUserItem(Controller controller, MyContext context, ulong userId, List<InventoryItem> items, int retry = 5)
{
    ResponseError result = null;

    try
    {
        using (var transaction = context.Database.BeginTransaction(System.Data.IsolationLevel.Serializable))
        {
            UserData data = await context.UserData.FindAsync(userId);

            // there is code here to detect duplicate entries/etc, but I've removed it for brevity
            foreach (var item in items) data.Inventory.Add(item.ItemId, item);

            context.UserData.Update(data);
            await context.SaveChangesAsync();

            transaction.Commit();
        }
    }
    catch (Exception e)
    {
        if (retry > 0)
        {
            await Task.Delay(SafeRandomGenerator(10, 500));
            return await AddUserItem(controller, context, userId, items, retry--);
        }
        else
        {
            // store exception and return error
        }
    }

    return result;
}

И теперь я возвращаюсь к тому, что данные иногда верны, иногда нет. Поэтому я думаю, что тупик - это еще одна проблема, но это единственный метод доступа к этим данным. Итак, я в растерянности. Есть ли простой способ чтения из базы данных (блокировка строки в процессе), а затем обратная запись (снятие блокировки при записи) с использованием EF Core? Я рассмотрел использование токенов параллелизма, но это кажется излишним для того, что (на первый взгляд) кажется тривиальной задачей.

Я добавил протоколирование для разъема mysql, а также asp. net сервера и может увидеть следующую ошибку:

fail: Microsoft.EntityFrameworkCore.Database.Command[20102]
  => RequestId:0HLUD39EILP3R:00000001 RequestPath:/client/AddUserItem => Server.Controllers.ClientController.AddUserItem (ServerSoftware)
  Failed executing DbCommand (78ms) [Parameters=[@p1='?' (DbType = UInt64), @p0='?' (Size = 4000)], CommandType='Text', CommandTimeout='30']
  UPDATE `UserData` SET `Inventory` = @p0
  WHERE `ID` = @p1;
  SELECT ROW_COUNT();

Общий взлом состоит в том, чтобы просто немного задержать поступление запросов. Это работает, потому что клиент, скорее всего, будет генерировать эти вызовы при загрузке. Обычно двухсторонние звонки не ожидаются, поэтому распределяйте их вовремя, задерживая работы по прибытии. Тем не менее, я бы предпочел найти правильный подход, так как это только снижает вероятность возникновения проблемы:

ResponseError result = null;
await Task.Delay(SafeRandomGenerator(100, 500));
using (var transaction = context.Database.BeginTransaction(System.Data.IsolationLevel.Serializable))
// etc

1 Ответ

0 голосов
/ 23 марта 2020

Это не очень хороший ответ, потому что это не то, что я хотел сделать, но я опубликую его здесь, так как это решило мою проблему. Моя проблема заключалась в том, что я пытался прочитать строку базы данных, изменить ее в asp. net, а затем записать обратно, все в рамках одной транзакции, избегая при этом тупиков. Вспомогательное поле имеет тип JSON, а MySQL предоставляет некоторые функции JSON, чтобы помочь изменить это JSON непосредственно в базе данных. Это потребовало от меня написания SQL операторов напрямую вместо использования EF, но это сработало.

Первый трюк состоял в том, чтобы убедиться, что я могу создать строку, если она не существует, без необходимости транзакции и блокировки .

INSERT INTO UserData VALUES ({0},'{{}}','{{}}') ON DUPLICATE KEY UPDATE ID = {0};

Я использовал JSON_REMOVE для удаления ключей из поля JSON:

UPDATE UserData as S set S.Inventory = JSON_REMOVE(S.Inventory,{1}) WHERE S.ID = {0};

и JSON_SET для добавления / изменения записей:

UPDATE UserData as S set S.Inventory = JSON_SET(S.Inventory,{1},CAST({2} as JSON)) WHERE S.ID = {0};

Примечание , если вы используете EF Core и хотите вызвать его с помощью FromSql, вам нужно вернуть объект как часть вашего оператора SQL. Так что вам нужно будет добавить что-то подобное в каждый оператор SQL:

SELECT * from UserData where ID = {0} LIMIT 1;

Вот полный рабочий пример в качестве метода расширения:

public static async Task<UserData> FindOrCreateAsync(this IQueryable<UserData> table, ulong userId)
{
    string sql = "INSERT INTO UserData VALUES ({0},'{{}}','{{}}') ON DUPLICATE KEY UPDATE ID = {0}; SELECT * FROM UserData WHERE ID={0} LIMIT 1;";
    return await table.FromSql(sql, userId).SingleOrDefaultAsync();
}

public static async Task<UserData> JsonRemoveInventory(this DbSet<UserData> table, ulong userId, string key)
{
    if (!key.StartsWith("$.")) key = $"$.\"{key}\"";
    string sql = "UPDATE UserData as S set S.Inventory = JSON_REMOVE(S.Inventory,{1}) WHERE S.ID = {0}; SELECT * from UserData where ID = {0} LIMIT 1;";
    return await table.AsNoTracking().FromSql(sql, userId, key).SingleOrDefaultAsync();
}

Использование:

var data = await context.UserData.FindOrCreateAsync(userId);
await context.UserData.JsonRemoveInventory(userId, itemId);
...