Безопасны ли sqlx.Next и sqlx.StructScan для одновременного использования? - PullRequest
1 голос
/ 13 июля 2020

У меня есть большая таблица в базе данных MySQL, из которой я пытаюсь читать как можно эффективнее. Я думал о том, чтобы ускорить код, добавив несколько рабочих, но когда я это сделаю, я получаю ошибки маршалинга в начале его запуска (и только в начале). Это выглядит примерно так:

{ «вызывающий»: «mysql. go: репозиторий. (* MySQLRepo) .GetNextBatch # 428», «error»: «DBGetRecordException: не удалось выполнить маршалинг комментариев к эпизоду: sql: ошибка сканирования в индексе столбца 4, имя «created_at»: неподдерживаемое сканирование, сохранение драйвера. Тип значения [] uint8 в тип * time.Time »,« level »:« error »,« ts »:« 2020-07-13T20: 42: 03.9621 658Z »}

Я не понимаю, что это ошибка, если я удалю рабочий код из ImportLegacyComments и просто l oop поверх него обычно. Безопасны ли sqlx.next и sqlx.StructScan для многопоточности, и если нет, есть ли альтернативный способ сделать это безопасно?

import (
    _ "github.com/go-sql-driver/mysql"
    "github.com/jmoiron/sqlx"
)

type BatchResult struct {
    rows *sqlx.Rows
}

func (m *MySQLRepo) GetNextBatch(b *BatchResult) ([]model.EpisodeComment, error) {
    var episodeComments []model.EpisodeComment
    for i := 0; i < 1000 && b.rows.Next(); i++ {
        var episodeComment model.EpisodeComment
        err := b.rows.StructScan(&episodeComment)
        if err != nil {
            return nil, err
        }
        episodeComments = append(episodeComments, episodeComment)
    }
    return episodeComments, nil
}
func (m *MySQLRepo) FetchAllEpisodeComments() (*BatchResult, error) {
    rows, err := m.db.Queryx("SELECT * FROM episode_comment")
    if err != nil {
        return nil, err
    }
    return &BatchResult{
        rows: rows,
    }, nil
}

func (svc *ImportService) ImportLegacyComments(ctx context.Context) error {
    batchResult, err := svc.legacyCommentsRepo.FetchAllEpisodeComments()
    var wg sync.WaitGroup
    processor := func() {
        comments, err := svc.legacyCommentsRepo.GetNextBatch(batchResult)
        if err != nil {
            svc.logger.Error(err)
        }
        for len(comments) > 0 {
            comments, err = svc.legacyCommentsRepo.GetNextBatch(batchResult)
            if err != nil {
                svc.logger.Error(err)
            }
            svc.logger.Info("batch", "completed 1000")
        }
        wg.Done()
    }

    for i := 0; i < 20; i++ {
        go processor()
        wg.Add(1)
    }

    wg.Wait()

    return err
}

1 Ответ

1 голос
/ 14 июля 2020

sqlx.Next и sqlx.StructScan небезопасны для одновременного использования.

Если вы соберете простой модульный тест для своего кода и запустите его с детектором гонки go test -race, он сообщит о гонке условие для неэкспортируемого поля структуры "database/sql".Rows:

Write at 0x00c00000e080 by goroutine 22:
  github.com/lib/pq.(*rows).Next()
      /Users/blackgreen/go/pkg/mod/github.com/lib/pq@v1.2.0/conn.go:1464 +0x8ec
  ...

Previous read at 0x00c00000e080 by goroutine 20:
  database/sql.(*Rows).Scan()
      /usr/local/go/src/database/sql/sql.go:3041 +0x2fa
  ...

Если мы go выясним, какое поле вызывает жалобы детектора гонки, мы увидим, что указание на одновременное использование правильно задокументировано:

    // lastcols is only used in Scan, Next, and NextResultSet which are expected
    // not to be called concurrently.
    lastcols []driver.Value
...