Ограничить максимальное количество подготовленных выписок - PullRequest
0 голосов
/ 02 января 2019

Проблема

Я написал приложение, которое синхронизирует данные из BigQuery в базу данных MySQL. Я пытаюсь вставлять примерно 10-20 тыс. Строк партиями (до 10 позиций в каждой партии) каждые 3 часа. По какой-то причине я получаю следующую ошибку при попытке вставить эти строки в MySQL:

Невозможно создать больше, чем max_prepared_stmt_count операторов:

Ошибка 1461: не могу создать больше, чем max_prepared_stmt_count операторов (текущее значение: 2000)

Мой "соответствующий код"

// ProcessProjectSkuCost receives the given sku cost entries and sends them in batches to upsertProjectSkuCosts()
func ProcessProjectSkuCost(done <-chan bigquery.SkuCost) {
    var skuCosts []bigquery.SkuCost
    var rowsAffected int64
    for skuCostRow := range done {
        skuCosts = append(skuCosts, skuCostRow)

        if len(skuCosts) == 10 {
            rowsAffected += upsertProjectSkuCosts(skuCosts)
            skuCosts = []bigquery.SkuCost{}
        }
    }
    if len(skuCosts) > 0 {
        rowsAffected += upsertProjectSkuCosts(skuCosts)
    }
    log.Infof("Completed upserting project sku costs. Affected rows: '%d'", rowsAffected)
}

// upsertProjectSkuCosts inserts or updates ProjectSkuCosts into SQL in batches
func upsertProjectSkuCosts(skuCosts []bigquery.SkuCost) int64 {
    // properties are table fields
    tableFields := []string{"project_name", "sku_id", "sku_description", "usage_start_time", "usage_end_time",
        "cost", "currency", "usage_amount", "usage_unit", "usage_amount_in_pricing_units", "usage_pricing_unit",
        "invoice_month"}
    tableFieldString := fmt.Sprintf("(%s)", strings.Join(tableFields, ","))

    // placeholderstring for all to be inserted values
    placeholderString := createPlaceholderString(tableFields)
    valuePlaceholderString := ""
    values := []interface{}{}
    for _, row := range skuCosts {
        valuePlaceholderString += fmt.Sprintf("(%s),", placeholderString)
        values = append(values, row.ProjectName, row.SkuID, row.SkuDescription, row.UsageStartTime,
            row.UsageEndTime, row.Cost, row.Currency, row.UsageAmount, row.UsageUnit,
            row.UsageAmountInPricingUnits, row.UsagePricingUnit, row.InvoiceMonth)
    }
    valuePlaceholderString = strings.TrimSuffix(valuePlaceholderString, ",")

    // put together SQL string
    sqlString := fmt.Sprintf(`INSERT INTO
        project_sku_cost %s VALUES %s ON DUPLICATE KEY UPDATE invoice_month=invoice_month`, tableFieldString, valuePlaceholderString)
    sqlString = strings.TrimSpace(sqlString)

    stmt, err := db.Prepare(sqlString)
    if err != nil {
        log.Warn("Error while preparing SQL statement to upsert project sku costs. ", err)
        return 0
    }

    // execute query
    res, err := stmt.Exec(values...)
    if err != nil {
        log.Warn("Error while executing statement to upsert project sku costs. ", err)
        return 0
    }

    rowsAffected, err := res.RowsAffected()
    if err != nil {
        log.Warn("Error while trying to access affected rows ", err)
        return 0
    }

    return rowsAffected
}

// createPlaceholderString creates a string which will be used for prepare statement (output looks like "(?,?,?)")
func createPlaceholderString(tableFields []string) string {
    placeHolderString := ""
    for range tableFields {
        placeHolderString += "?,"
    }
    placeHolderString = strings.TrimSuffix(placeHolderString, ",")

    return placeHolderString
}

Мой вопрос:

Почему я нажимаю max_prepared_stmt_count, когда немедленно выполняю подготовленный оператор (см. Функцию upsertProjectSkuCosts)?

Я мог только предположить, что это своего рода параллелизм, который создает тонны подготовленных утверждений между подготовкой и выполнением всех этих утверждений. С другой стороны, я не понимаю, почему было бы так много параллелизма, поскольку канал в ProcessProjectSkuCost является буферизованным каналом с размером 20.

1 Ответ

0 голосов
/ 02 января 2019

Вам нужно закрыть оператор внутри upsertProjectSkuCosts() (или использовать его повторно - см. Конец этого поста).

Когда вы звоните db.Prepare(), соединение берется из пула внутренних соединений(или создается новое соединение, если нет свободных соединений).Затем оператор подготовлен для этого соединения (если это соединение не является свободным при вызове stmt.Exec(), оператор затем также готовится для другого соединения).Так что это создает заявление в вашей базе данных для этого соединения.Это утверждение волшебным образом не исчезнет - наличие нескольких подготовленных утверждений в соединении совершенно допустимо.Golang может увидеть, что stmt выходит из области видимости, увидеть, что требуется некоторая очистка, а затем выполнить эту очистку, но Golang этого не делает (точно так же, как он не закрывает файлы для вас и тому подобное).тот).Так что вам нужно сделать это самостоятельно, используя stmt.Close().Когда вы вызываете stmt.Close(), драйвер отправит команду серверу базы данных, сообщив, что оператор больше не нужен.

Самый простой способ сделать это - добавить defer stmt.Close() после errпроверьте следующее db.Prepare().

Что вы также можете сделать, это подготовить заявление один раз и сделать его доступным для upsertProjectSkuCosts (либо передавая stmt в upsertProjectSkuCosts, либо сделав upsertProjectSkuCosts aфункция структуры, поэтому структура может иметь свойство для stmt).Если вы сделаете это, вы должны не позвонить stmt.Close() - поскольку вы больше не создаете новые операторы, вы повторно используете существующий оператор.

Также см. Следуетмы также закрываем .Prepare () БД на Голанге? и https://groups.google.com/forum/#!topic/golang-nuts/ISh22XXze-s

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...