Рекомендуемый способ закрытия избыточного объекта sql .Rows после процедуры Go - PullRequest
2 голосов
/ 21 февраля 2020

Я использую Go подпрограммы для параллельной отправки запросов на PostgreSQL главный и подчиненный узлы. Первый хост, который возвращает действительный результат, выигрывает. Случаи ошибок выходят за рамки этого вопроса.

Вызывающий объект - единственный, кто заботится о содержимом объекта *sql.Rows, поэтому намеренно моя функция не выполняет над ними никаких операций. Я использую буферизованные каналы для извлечения возвращаемых объектов из подпрограмм Go, поэтому не должно быть утечки подпрограммы Go. Сборка мусора должна позаботиться обо всем остальном.

Существует проблема, о которой я не учил должным образом: объекты Rows, оставшиеся в канале, никогда не закрываются. Когда я вызываю эту функцию из транзакции (только для чтения), tx.Rollback() возвращает ошибку для каждого экземпляра незамкнутого Rows объекта: "unexpected command tag SELECT".

Эта функция вызывается из объектов более высокого уровня:

func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
    rc := make(chan *sql.Rows, len(xs))
    ec := make(chan error, len(xs))
    for _, x := range xs {
        go func(x executor) {
            rows, err := x.QueryContext(ctx, query, args...)
            switch { // Make sure only one of them is returned
            case err != nil:
                ec <- err
            case rows != nil:
                rc <- rows
            }
        }(x)
    }

    var me MultiError
    for i := 0; i < len(xs); i++ {
        select {
        case err := <-ec:
            me.append(err)
        case rows := <-rc: // Return on the first success
            return rows, nil
        }
    }
    return nil, me.check()
}

Исполнителями могут быть *sql.DB, *sql.Tx или все, что соответствует интерфейсу:

type executor interface {
    ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
    QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
    QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

Logi отката c:

func (mtx MultiTx) Rollback() error {
    ec := make(chan error, len(mtx))
    for _, tx := range mtx {
        go func(tx *Tx) {
            err := tx.Rollback()
            ec <- err
        }(tx)
    }
    var me MultiError
    for i := 0; i < len(mtx); i++ {
        if err := <-ec; err != nil {
            me.append(err)
        }
    }
    return me.check()
}

MultiTx - это набор открытых транзакций на нескольких узлах. Это объект более высокого уровня, который вызывает multiQuery

Как лучше всего «очистить» неиспользуемые строки? Опции, о которых я думаю, не делать:

  1. Отмена контекста: я считаю, что он будет работать непоследовательно, возможно, несколько запросов уже вернулись к тому времени, когда cancel() называется
  2. Создать отложенная подпрограмма Go, которая продолжает истощать каналы и закрывать объекты строк: если узел БД медленно реагирует, Rollback() все еще вызывается до rows.Close()
  3. Используйте sync.WaitGroup где-то в тип MultiTx, возможно в сочетании с (2): это может привести к зависанию отката, если один из узлов не отвечает. Кроме того, я не был бы уверен, как бы это реализовать.
  4. Игнорировать ошибки отката: игнорирование ошибок никогда не звучит как хорошая идея, они есть по причине.

Каков был бы рекомендуемый способ приблизиться к этому?

Редактировать:

Как предложено @Peter, я попытался отменить контекст, но кажется, что это также делает недействительными все возвращенные строки из запрос. На rows.Scan Я получаю context canceled ошибку на вызывающей стороне более высокого уровня.

Это то, что я сделал до сих пор:

func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    rc := make(chan *sql.Rows, len(xs))
    ec := make(chan error, len(xs))
    for _, x := range xs {
        go func(x executor) {
            rows, err := x.QueryContext(ctx, query, args...)
            switch { // Make sure only one of them is returned
            case err != nil:
                ec <- err
            case rows != nil:
                rc <- rows
                cancel() // Cancel on success
            }
        }(x)
    }

    var (
        me   MultiError
        rows *sql.Rows
    )
    for i := 0; i < len(xs); i++ {
        select {
        case err := <-ec:
            me.append(err)
        case r := <-rc:
            if rows == nil { // Only use the first rows
                rows = r
            } else {
                r.Close() // Cleanup remaining rows, if there are any
            }
        }
    }
    if rows != nil {
        return rows, nil
    }

    return nil, me.check()
}

Редактировать 2:

@ Адриан упомянул:

мы не можем видеть код, который фактически использует что-либо из этого.

Этот код повторно используется методами типа. Сначала идет тип транзакции. Проблемы в этом вопросе появляются при использовании метода Rollback(), описанного выше.

// MultiTx holds a slice of open transactions to multiple nodes.
// All methods on this type run their sql.Tx variant in one Go routine per Node.
type MultiTx []*Tx

// QueryContext runs sql.Tx.QueryContext on the tranactions in separate Go routines.
// The first non-error result is returned immediately
// and errors from the other Nodes will be ignored.
//
// If all nodes respond with the same error, that exact error is returned as-is.
// If there is a variety of errors, they will be embedded in a MultiError return.
//
// Implements boil.ContextExecutor.
func (mtx MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    return multiQuery(ctx, mtx2Exec(mtx), query, args...)
}

Тогда есть:

// MultiNode holds a slice of Nodes.
// All methods on this type run their sql.DB variant in one Go routine per Node.
type MultiNode []*Node

// QueryContext runs sql.DB.QueryContext on the Nodes in separate Go routines.
// The first non-error result is returned immediately
// and errors from the other Nodes will be ignored.
//
// If all nodes respond with the same error, that exact error is returned as-is.
// If there is a variety of errors, they will be embedded in a MultiError return.
//
// Implements boil.ContextExecutor.
func (mn MultiNode) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    return multiQuery(ctx, nodes2Exec(mn), query, args...)
}

Эти методы публикуют c обертки вокруг multiQuery() функция. Теперь я понимаю, что просто отправка *Rows в буферный канал на d ie, на самом деле, утечка памяти. В случаях сделки становится ясно, как Rollback() начинает жаловаться. Но в варианте без транзакций *Rows внутри канала никогда не будет собирать мусор, поскольку драйвер может хранить ссылку на него до тех пор, пока не будет вызван rows.Close().

Я написал этот пакет для используется ORM, sqlboiler . Мой логик более высокого уровня c передает объект MultiTX в ORM. С этого момента у меня нет никакого явного контроля над возвращенным Rows. Упрощенный подход c состоял бы в том, что мой код более высокого уровня отменяет контекст до Rollback(), но мне это не нравится:

  1. Это дает неинтуитивный API. Этот (идиоматический c) подход сломался бы:
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
tx, _ := db.BeginTx(ctx)
defer tx.Rollback()

Интерфейсы ORM также задают обычные, не зависящие от контекста варианты Query(), которые в моем случае будут работать с context.Background().

Я начинаю беспокоиться, что это сломано В любом случае, я начну с реализации подпрограммы Go, которая опустошит канал и закроет *Rows. После этого я посмотрю, смогу ли я реализовать какой-нибудь разумный механизм ожидания / отмены, который не повлияет на возвращаемый *Rows

Ответы [ 2 ]

1 голос
/ 23 февраля 2020

Я думаю, что приведенная ниже функция будет делать то, что вам нужно, с одним условием, что передаваемый контекст должен быть отменен, когда вы закончите с результатами (в противном случае один context.WithCancel будет просочен; я не могу обойти это так как отмена его внутри функции сделает недействительным возвращенное значение sql.Rows).

Обратите внимание, что у меня не было времени проверить это (потребуется настроить базу данных, реализовать ваши интерфейсы и т. д. c), поэтому что ж, в коде скрыта ошибка (но я верю, что алгоритм basi c надежен)

// queryResult holds the goroutine# and the result from that gorouting (need both so we can avoid cancelling the relevant context)
type queryResult struct {
    no   int
    rows *sql.Rows
}

// multiQuery - Executes multiple queries and returns either the first to resutn a result or, if all fail, a multierror summarising the errors
// Important: This should be used for READ ONLY queries only (it is possible that more than one will complete)
// Note: The ctx passed in must be cancelled to avoid leaking a context (this routine cannot cancel the context used for the winning query)
func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
    noOfQueries := len(xs)
    rc := make(chan queryResult) // Channel for results; unbuffered because we only want one, and only one, result
    ec := make(chan error)       // errors get sent here - goroutines must send a result or 1 error
    defer close(ec)              // Ensure the error consolidation go routine will complete

    // We need a way to cancel individual goroutines as we do not know which one will succeed
    cancelFns := make([]context.CancelFunc, noOfQueries)

    // All goroutines must terminate before we exit (otherwise the transaction maybe rolled back before they are cancelled leading to "unexpected command tag SELECT")
    var wg sync.WaitGroup
    wg.Add(noOfQueries)

    for i, x := range xs {
        var queryCtx context.Context
        queryCtx, cancelFns[i] = context.WithCancel(ctx)
        go func(ctx context.Context, queryNo int, x executor) {
            defer wg.Done()

            rows, err := x.QueryContext(ctx, query, args...)
            if err != nil {
                ec <- err // Error collection go routine guaranteed to run until all query goroutines complete
                return
            }

            select {
            case rc <- queryResult{queryNo, rows}:
                return
            case <-ctx.Done(): // If another query has already transmitted its results these should be thrown away
                rows.Close() // not strictly required because closed context should tidy up
                return
            }
        }(queryCtx, i, x)
    }

    // Start go routine that will send a MultiError to a channel if all queries fail
    mec := make(chan MultiError)
    go func() {
        var me MultiError
        errCount := 0
        for err := range ec {
            me.append(err)
            errCount += 1
            if errCount == noOfQueries {
                mec <- me
                return
            }

        }
    }()

    // Wait for one query to succeed or all queries to fail
    select {
    case me := <-mec:
        for _, cancelFn := range cancelFns { // not strictly required so long as ctx is eventually cancelled
            cancelFn()
        }
        wg.Wait()
        return nil, me.check()
    case result := <-rc:
        for i, cancelFn := range cancelFns { // not strictly required so long as ctx is eventually cancelled
            if i != result.no { // do not cancel the query that returned a result
                cancelFn()
            }
        }
        wg.Wait()
        return result.rows, nil
    }
}
0 голосов
/ 24 февраля 2020

Благодаря комментариям @Peter и ответу @Brits я получил sh идеи о том, как к этому подойти.

Blue print

3 из 4 предложений от вопрос нужно было реализовать.

1. Отмена контекста

mtx.QueryContext() создает дочерний контекст и устанавливает CancelFunc в объекте MultiTx.

Помощник cancelWait() отменяет старый контекст и ожидает MultiTX.Done если это не ноль. Он вызывается Rollback() и перед каждым новым запросом.

2. Слить канал

в multiQuery(). После получения первого успешного Rows запускается процедура Go для слива и закрытия оставшихся Rows. Канал строк больше не нуждается в буферизации.

Дополнительная подпрограмма Go и WaitGroup используются для закрытия каналов ошибок и строк.

3. Возвращает готовый канал

Вместо предложенного WaitGroup, multiQuery() возвращает готовый канал. Канал закрывается после завершения процедуры слива и закрытия. mtx.QueryContext() устанавливает готовый канал на объекте MultiTx.

Ошибки

Вместо блока select только канал ошибок сливается, если сейчас есть Rows. По этой причине ошибка должна оставаться в буфере.

Код

// MultiTx holds a slice of open transactions to multiple nodes.
// All methods on this type run their sql.Tx variant in one Go routine per Node.
type MultiTx struct {
    tx      []*Tx
    done    chan struct{}
    cancels context.CancelFunc
}

func (m *MultiTx) cancelWait() {
    if m.cancel != nil {
        m.cancel()
    }
    if m.done != nil {
        <-m.done
    }

    // reset
    m.done, m.cancel = nil, nil
}

// Context creates a child context and appends CancelFunc in MultiTx
func (m *MultiTx) context(ctx context.Context) context.Context {
    m.cancelWait()
    ctx, m.cancel = context.WithCancel(ctx)
    return ctx
}

// QueryContext runs sql.Tx.QueryContext on the tranactions in separate Go routines.
func (m *MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (rows *sql.Rows, err error) {
    rows, m.done, err = multiQuery(m.context(ctx), mtx2Exec(m.tx), query, args...)
    return rows, err
}

func (m *MultiTx) Rollback() error {
    m.cancelWait()
    ec := make(chan error, len(m.tx))
    for _, tx := range m.tx {
        go func(tx *Tx) {
            err := tx.Rollback()
            ec <- err
        }(tx)
    }
    var me MultiError
    for i := 0; i < len(m.tx); i++ {
        if err := <-ec; err != nil {
            me.append(err)
        }
    }
    return me.check()
}

func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, chan struct{}, error) {
    rc := make(chan *sql.Rows)
    ec := make(chan error, len(xs))

    var wg sync.WaitGroup
    wg.Add(len(xs))
    for _, x := range xs {
        go func(x executor) {
            rows, err := x.QueryContext(ctx, query, args...)
            switch { // Make sure only one of them is returned
            case err != nil:
                ec <- err
            case rows != nil:
                rc <- rows
            }
            wg.Done()
        }(x)
    }

    // Close channels when all query routines completed
    go func() {
        wg.Wait()
        close(ec)
        close(rc)
    }()

    rows, ok := <-rc
    if ok { // ok will be false if channel closed before any rows
        done := make(chan struct{}) // Done signals the caller that all remaining rows are properly closed
        go func() {
            for rows := range rc { // Drain channel and close unused Rows
                rows.Close()
            }
            close(done)
        }()
        return rows, done, nil
    }

    // no rows, build error return
    var me MultiError
    for err := range ec {
        me.append(err)
    }
    return nil, nil, me.check()
}

Редактировать: Отмена и ожидание старых контекстов перед каждым запросом, например *sql.Tx не Go обычное сохранение, все предыдущие запросы должны быть выполнены до следующего вызова.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...