Я использую Go подпрограммы для параллельной отправки запросов на PostgreSQL главный и подчиненный узлы. Первый хост, который возвращает действительный результат, выигрывает. Случаи ошибок выходят за рамки этого вопроса.
Вызывающий объект - единственный, кто заботится о содержимом объекта *sql.Rows
, поэтому намеренно моя функция не выполняет над ними никаких операций. Я использую буферизованные каналы для извлечения возвращаемых объектов из подпрограмм Go, поэтому не должно быть утечки подпрограммы Go. Сборка мусора должна позаботиться обо всем остальном.
Существует проблема, о которой я не учил должным образом: объекты Rows, оставшиеся в канале, никогда не закрываются. Когда я вызываю эту функцию из транзакции (только для чтения), tx.Rollback()
возвращает ошибку для каждого экземпляра незамкнутого Rows
объекта: "unexpected command tag SELECT"
.
Эта функция вызывается из объектов более высокого уровня:
func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
rc := make(chan *sql.Rows, len(xs))
ec := make(chan error, len(xs))
for _, x := range xs {
go func(x executor) {
rows, err := x.QueryContext(ctx, query, args...)
switch { // Make sure only one of them is returned
case err != nil:
ec <- err
case rows != nil:
rc <- rows
}
}(x)
}
var me MultiError
for i := 0; i < len(xs); i++ {
select {
case err := <-ec:
me.append(err)
case rows := <-rc: // Return on the first success
return rows, nil
}
}
return nil, me.check()
}
Исполнителями могут быть *sql.DB
, *sql.Tx
или все, что соответствует интерфейсу:
type executor interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
Logi отката c:
func (mtx MultiTx) Rollback() error {
ec := make(chan error, len(mtx))
for _, tx := range mtx {
go func(tx *Tx) {
err := tx.Rollback()
ec <- err
}(tx)
}
var me MultiError
for i := 0; i < len(mtx); i++ {
if err := <-ec; err != nil {
me.append(err)
}
}
return me.check()
}
MultiTx
- это набор открытых транзакций на нескольких узлах. Это объект более высокого уровня, который вызывает multiQuery
Как лучше всего «очистить» неиспользуемые строки? Опции, о которых я думаю, не делать:
- Отмена контекста: я считаю, что он будет работать непоследовательно, возможно, несколько запросов уже вернулись к тому времени, когда
cancel()
называется - Создать отложенная подпрограмма Go, которая продолжает истощать каналы и закрывать объекты строк: если узел БД медленно реагирует,
Rollback()
все еще вызывается до rows.Close()
- Используйте
sync.WaitGroup
где-то в тип MultiTx, возможно в сочетании с (2): это может привести к зависанию отката, если один из узлов не отвечает. Кроме того, я не был бы уверен, как бы это реализовать. - Игнорировать ошибки отката: игнорирование ошибок никогда не звучит как хорошая идея, они есть по причине.
Каков был бы рекомендуемый способ приблизиться к этому?
Редактировать:
Как предложено @Peter, я попытался отменить контекст, но кажется, что это также делает недействительными все возвращенные строки из запрос. На rows.Scan
Я получаю context canceled
ошибку на вызывающей стороне более высокого уровня.
Это то, что я сделал до сих пор:
func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
rc := make(chan *sql.Rows, len(xs))
ec := make(chan error, len(xs))
for _, x := range xs {
go func(x executor) {
rows, err := x.QueryContext(ctx, query, args...)
switch { // Make sure only one of them is returned
case err != nil:
ec <- err
case rows != nil:
rc <- rows
cancel() // Cancel on success
}
}(x)
}
var (
me MultiError
rows *sql.Rows
)
for i := 0; i < len(xs); i++ {
select {
case err := <-ec:
me.append(err)
case r := <-rc:
if rows == nil { // Only use the first rows
rows = r
} else {
r.Close() // Cleanup remaining rows, if there are any
}
}
}
if rows != nil {
return rows, nil
}
return nil, me.check()
}
Редактировать 2:
@ Адриан упомянул:
мы не можем видеть код, который фактически использует что-либо из этого.
Этот код повторно используется методами типа. Сначала идет тип транзакции. Проблемы в этом вопросе появляются при использовании метода Rollback()
, описанного выше.
// MultiTx holds a slice of open transactions to multiple nodes.
// All methods on this type run their sql.Tx variant in one Go routine per Node.
type MultiTx []*Tx
// QueryContext runs sql.Tx.QueryContext on the tranactions in separate Go routines.
// The first non-error result is returned immediately
// and errors from the other Nodes will be ignored.
//
// If all nodes respond with the same error, that exact error is returned as-is.
// If there is a variety of errors, they will be embedded in a MultiError return.
//
// Implements boil.ContextExecutor.
func (mtx MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return multiQuery(ctx, mtx2Exec(mtx), query, args...)
}
Тогда есть:
// MultiNode holds a slice of Nodes.
// All methods on this type run their sql.DB variant in one Go routine per Node.
type MultiNode []*Node
// QueryContext runs sql.DB.QueryContext on the Nodes in separate Go routines.
// The first non-error result is returned immediately
// and errors from the other Nodes will be ignored.
//
// If all nodes respond with the same error, that exact error is returned as-is.
// If there is a variety of errors, they will be embedded in a MultiError return.
//
// Implements boil.ContextExecutor.
func (mn MultiNode) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return multiQuery(ctx, nodes2Exec(mn), query, args...)
}
Эти методы публикуют c обертки вокруг multiQuery()
функция. Теперь я понимаю, что просто отправка *Rows
в буферный канал на d ie, на самом деле, утечка памяти. В случаях сделки становится ясно, как Rollback()
начинает жаловаться. Но в варианте без транзакций *Rows
внутри канала никогда не будет собирать мусор, поскольку драйвер может хранить ссылку на него до тех пор, пока не будет вызван rows.Close()
.
Я написал этот пакет для используется ORM, sqlboiler . Мой логик более высокого уровня c передает объект MultiTX
в ORM. С этого момента у меня нет никакого явного контроля над возвращенным Rows
. Упрощенный подход c состоял бы в том, что мой код более высокого уровня отменяет контекст до Rollback()
, но мне это не нравится:
- Это дает неинтуитивный API. Этот (идиоматический c) подход сломался бы:
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
tx, _ := db.BeginTx(ctx)
defer tx.Rollback()
Интерфейсы ORM также задают обычные, не зависящие от контекста варианты
Query()
, которые в моем случае будут работать с
context.Background()
.
Я начинаю беспокоиться, что это сломано В любом случае, я начну с реализации подпрограммы Go, которая опустошит канал и закроет *Rows
. После этого я посмотрю, смогу ли я реализовать какой-нибудь разумный механизм ожидания / отмены, который не повлияет на возвращаемый *Rows