Поддерживать целостность при одновременных обновлениях одной и той же строки - PullRequest
1 голос
/ 21 февраля 2020

В следующем фрагменте кода я пытаюсь найти, удалить и создать один и тот же элемент, но в 2 разных транзакциях в 2 разных потоках.

В Потоке 1 я создаю транзакцию 1, найдите элемент и удалите его.

Как только это будет сделано, я разрешаю потоку 2 создать транзакцию 2 и попытаюсь найти элемент. Метод Find() здесь блокируется, так как я использую опцию FOR UPDATE.

. Вернувшись в поток 1, элемент воссоздается и транзакция 1 фиксируется, что позволяет Find() в теме 2 для завершения. Вот проблемы, которые там возникают:

Если я использую уровень изоляции "ReadCommitted", я получаю ошибку not found - это не имеет смысла для меня, потому что я думал, что транзакция ReadCommitted можно увидеть обновления, примененные другими.

Если я использую уровень изоляции «Сериализуемый», я получаю сообщение об ошибке: pq: could not serialize access due to concurrent update.

Почему я вижу такое поведение? Я бы подумал, что после второго поиска разблокировки он должен предоставить мне самую последнюю строку.

Как я могу сделать так, чтобы, когда строка находится в процессе изменения, любые другие операции чтения блокировались, и разблокировать возврат самых актуальных данных по завершении в других темах?

db, err := gorm.Open("postgres", "host=localHost port=5432 user=postgres dbname=test-rm password=postgres sslmode=disable")
if err != nil { panic("failed to connect database") }
db.SingularTable(true)
db.DropTableIfExists(&Product{})
db.AutoMigrate(&Product{})

db.Create(&Product{Code: "A", Price: 1000})
// SQL: INSERT  INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"

txOpt := &sql.TxOptions{Isolation: sql.LevelSerializable}

doneTrans1 := make(chan struct{})

go func(){
    item1 := &Product{}

    tx1 := db.BeginTx(context.Background(), txOpt)

    err = tx1.Set("gorm:query_option", "FOR UPDATE").Find(item1, "code = ?", "A").Error
    // SQL: SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE

    item1.Price = 3000

    err = tx1.Delete(item1).Error
    // SQL: DELETE FROM "product"  WHERE "product"."id" = 1

    doneTrans1<-struct{}{}
    time.Sleep(time.Second * 3)

    err = tx1.Create(item1).Error
    // SQL: INSERT  INTO "product" ("id","code","price") VALUES (1,'A',3000) RETURNING "product"."id"

    tx1.Commit()
}()

// Ensure other trans work started
<-doneTrans1
time.Sleep(time.Second * 2)

item2 := &Product{}

tx2 := db.BeginTx(context.Background(), txOpt)

err = tx2.Set("gorm:query_option", "FOR UPDATE").Find(item2, "code = ?", "A").Error
// SQL: SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE
// ERROR occurs here

item2.Price = 5000
err = tx2.Delete(item2).Error
err = tx2.Create(item2).Error
tx2.Commit()
time.Sleep(time.Second * 5)

Ответы [ 2 ]

3 голосов
/ 21 февраля 2020

Чтобы ответить на этот вопрос, я думаю, что лучше удалить сложность горутина (и, вообще-то, go вообще) и сосредоточиться на SQL. Ниже приведены операторы SQL в порядке их запуска (я проигнорировал все после возникновения ошибки, поскольку это в основном не имеет значения, и порядок выполнения становится сложным / переменным!).

In основная процедура

INSERT  INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"

In GoRoutine

BEGIN TX1
SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE
DELETE FROM "product"  WHERE "product"."id" = 1

В основной процедуре

BEGIN TX2
SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE -- ERROR occurs here

на ваши вопросы.

Вопрос 1

Если я использую уровень изоляции «ReadCommitted», я получаю ошибку «not found» - в этом нет смысла я, потому что я думал, что транзакция ReadCommitted может видеть обновления, примененные другими.

Из документов для Read Committed Isolation Level :

UPDATE Команды, DELETE, SELECT FOR UPDATE и SELECT FOR SHARE ведут себя так же, как и SELECT, в отношении поиска целевых строк: они будут находить только те целевые строки, которые были зафиксированы на момент запуска команды. Однако такая целевая строка, возможно, уже была обновлена ​​(или удалена, или заблокирована) другой параллельной транзакцией к моменту ее обнаружения. В этом случае потенциальный обновитель будет ожидать первой транзакции обновления, чтобы зафиксировать или откатить (если она все еще выполняется). Если первый обновитель откатывается, то его эффекты отменяются, и второй обновитель может продолжить обновление первоначально найденной строки. Если первый обновитель фиксирует, второй обновитель проигнорирует строку, если первый обновитель удалил ее, в противном случае он попытается применить свою операцию к обновленной версии строки.

Таким образом, SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE в TX2 будет ждать завершения TX1. В этот момент TX1 удалил продукт A, поэтому строка игнорируется и результаты не возвращаются. Теперь я понимаю, что TX1 также воссоздает продукт A, но помните, что «запрос SELECT (без предложения FOR UPDATE / SHARE) видит только данные, зафиксированные до начала запроса»; и так как выбор начался до того, как TX1 воссоздает запись, она не будет видна.

Вопрос 2

Если я использую уровень изоляции "Serializable", я получаю ошибка: pq: не удалось сериализовать доступ из-за одновременного обновления.

Из документов для Уровень изоляции Repeatable Read * (Serializable - более высокий уровень, поэтому эти правила плюс некоторые более строгие те, которые применяются):

Команды UPDATE, DELETE, SELECT FOR UPDATE и SELECT FOR SHARE ведут себя так же, как и SELECT в плане поиска целевых строк: они найдут только те целевые строки, которые были зафиксированы как времени начала транзакции. Однако такая целевая строка, возможно, уже была обновлена ​​(или удалена, или заблокирована) другой параллельной транзакцией к моменту ее обнаружения. В этом случае повторяемая транзакция чтения будет ожидать, когда первая обновляющая транзакция будет зафиксирована или откатана (если она все еще выполняется). Если первый модуль обновления откатывается назад, то его эффекты сводятся на нет, и повторяемая транзакция чтения может продолжить обновление первоначально найденной строки. Но если первый модуль обновления фиксирует (и фактически обновил или удалил строку, а не просто заблокировал ее), то повторяемая транзакция чтения будет откатана с сообщением

В вашем коде TX1 обновляет продукт A, означая, что запрос в TX2 будет отложен до тех пор, пока TX1 не завершит свою работу, и в этот момент он прервется с ошибкой (если откат TX1 откатится, то он продолжится).

Как я могу выполнить второе обновление? *

Поддержание целостности транзакций является сложной проблемой, а функциональность в PostgreSQL является результатом большой работы некоторых очень умных людей. Если вы сталкиваетесь с базой данных, часто неплохо сделать шаг назад и подумать, нужно ли вам изменить свой подход (или если проблема, которую вы воспринимаете, является реальной проблемой).

В вашем примере у вас есть две подпрограммы, которые удаляют и воссоздают одну и ту же запись; Я не могу предвидеть ситуацию, когда вы хотите, чтобы обе транзакции продолжались. В реальной системе, где это было возможно, у вас не было бы тщательно настроенных таймеров, обеспечивающих начало одной транзакции. Это будет означать, что состояние базы данных после завершения транзакции будет зависеть от того, кто первым достиг SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE. Так что в действительности это не имеет значения, если кто-то потерпит неудачу (потому что результат в любом случае случайный); на самом деле это лучший результат, потому что вы можете посоветовать пользователю (который может проверить запись и перезапустить задачу при необходимости).

Поэтому, прежде чем читать остальную часть этого, я бы посоветовал вам подумать, если это проблема вообще (у меня нет сведений о том, что вы пытаетесь выполнить sh, поэтому его сложно комментировать).

Если вы действительно хотите обеспечить обновление, у вас есть несколько вариантов:

  • Если вы используете «Сериализуемый», вам нужно обнаружить сбой и повторить транзакцию (если этого требует бизнес-логика c)
  • Если вы используете «Передать чтение», замените УДАЛИТЬ / ВСТАВИТЬ на ОБНОВЛЕНИЕ (в этом случае PostgreSQL пересмотрит предложение WHERE, когда будет снята блокировка первой транзакции).

Однако я считаю, что лучший подход - избавиться от большей части этого и попытаться выполнить такие обновления за один шаг (что может означать обход ORM). Если вы хотите минимизировать вероятность подобных проблем, важно минимизировать количество / продолжительность блокировок, и выполнение операции за один шаг значительно помогает. Для сложных операций использование хранимой процедуры ускоряет процесс, но все еще существует (уменьшенная) вероятность конфликта с другими одновременно выполняющимися операциями.

Возможно, вы также захотите взглянуть на Optimisti c Блокировка , потому что в некоторых случаях это имеет больше смысла (например, когда вы читаете информацию, отображаете ее для пользователя и ждете изменений, но в то же время другой пользователь мог внести изменения).

0 голосов
/ 21 февраля 2020

Может быть, я неправильно понимаю - я раньше не использовал Горм. Однако из ваших комментариев к запросу обе транзакции в двух ваших подпрограммах имеют «SELECT .. FOR UPDATE», и они выполняются параллельно. Ваша основная программа не ждет, пока транзакция, запущенная внутри вашей второй программы, будет подтверждена, прежде чем пытаться «ВЫБРАТЬ .. ДЛЯ ОБНОВЛЕНИЯ» тех же строк.

Согласно вашему объяснению, возможно, вы включили «ДЛЯ ОБНОВЛЕНИЯ» в вторая программа по ошибке.

Или вы можете использовать блокировку syn c .Mutex во второй процедуре и освободить ее после фиксации. Пока главная программа ожидает получения блокировки и только затем выполняет свой запрос.

...