Потеря данных с горутин - PullRequest
0 голосов
/ 20 января 2020

Я пишу AWS лямбда-код для запроса таблицы RDS, преобразовать его в JSON и вернуть. Но я не вижу всех записей в JSON, чем то, что вернул запрос SQL. Скажем, я запрашиваю 1500 записей из таблицы, но в JSON (на 0-5 записей меньше) каждый раз только 1496-1500 записей. Я сомневаюсь, что что-то напутал с sync.WaitGroup.

Ниже SQL Запрос к серверу

SELECT TOP 1500 * FROM IMBookingApp.dbo.BA_Contact__c
WHERE ContactId > 0

Ниже мой код

// Convert the rows object to slice of objects for every row
func parseRow(rows *sql.Rows, totalColumns int) []string {

    receiver := make([]string, totalColumns)

    is := make([]interface{}, len(receiver))
    for i := range is {
        is[i] = &receiver[i]
    }

    err := rows.Scan(is...)

    if err != nil {
        fmt.Println("Error reading rows: " + err.Error())
    }

    TotalRecordsInParseRowfunction++
    return receiver
}

// Query the given table and return JSON response
func queryTable(conn *sql.DB, query string) (string, error) {

    // Query Table
    rows, err := conn.Query(query)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    println("Rows:", rows)
    defer rows.Close()

    // Get the column names
    columns, err := rows.Columns()
    // fmt.Println("columns", columns)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    totalColumns := len(columns)
    var resp []map[string]string // Declare the type of final response which will be used to create JSON
    var waitgroup sync.WaitGroup

    // Iterate over all the rows returned
    for rows.Next() {

        waitgroup.Add(1)
        TotalRecordsCount++
        row := parseRow(rows, totalColumns)

        go func() {
            // Create a map of the row
            respRow := map[string]string{} // Declare the type of each row of response
            for count := range row {
                respRow[columns[count]] = row[count]
            }
            // fmt.Println("\n\nrespRow", respRow)

            resp = append(resp, respRow)
            TotalRecordsAppendedCount++
            waitgroup.Done()
        }()
    }
    waitgroup.Wait()

    // If no rows are returned
    if len(resp) == 0 {
        fmt.Println("MESSAGE: No records are available")
        return "", errors.New("MESSAGE: No records are available")
    }

    // Create JSON
    respJSON, _ := json.Marshal(resp)
    fmt.Println("Response", string(respJSON))

    fmt.Println("\n--------------Summary---------------")
    fmt.Println("TotalRecordsInParseRowfunction", TotalRecordsInParseRowfunction)
    fmt.Println("TotalRecordsCount", TotalRecordsCount)
    fmt.Println("TotalRecordsAppendedCount", TotalRecordsAppendedCount)
    fmt.Println("Object Length", len(resp))

    return string(respJSON), nil // Return JSON

}

Ниже приводится сводная информация о выходе, которую я получаю

--------------Summary---------------
TotalRecordsInParseRowfunction 1500
TotalRecordsCount 1500
TotalRecordsAppendedCount 1500
Object Length 1496

1 Ответ

4 голосов
/ 20 января 2020

Твой код расистский. Несколько процедур пишут в resp без какого-либо взаимного исключения, поэтому вы теряете данные.

Вы можете добавить мьютекс-блокировку-разблокировку вокруг этого. Тем не менее, код, который вы имеете в goroutine, не гарантирует его собственную goroutine, потому что это простое дополнение карты. Работать с этим кодом в goroutine будет намного проще, и, вероятно, он будет работать намного быстрее без затрат на планирование goroutine. Если вы не планируете иметь больше логики c в этой процедуре, я предлагаю вам удалить ее.

Вот еще немного информации о том, что, вероятно, происходит: прежде всего, в текущей версии go, программа будет уступать другим только тогда, когда эта программа вызывает некоторые функции библиотеки. Глядя на код, маловероятно, что ваши горутины когда-нибудь дадут. Поскольку вы уже наблюдали потерю данных (что означает, что существует состояние гонки), у вас, вероятно, более одного ядра.

Гонка здесь:

resp = append(resp, respRow)

Без взаимного исключения, одна программа может посмотреть на resp, видит, что может записать в свой n -й элемент. Другая программа (работающая на отдельном ядре) может сделать то же самое и успешно записывает туда. Но первая программа все еще думает, что элемент пуст, поэтому перезаписывает его и обновляет resp. Когда это происходит, вы теряете один элемент.

Если вы добавите взаимное исключение в этот код, вы по существу заставите все программы запускаться последовательно, потому что они на самом деле ничего не делают. Кроме того, поскольку порядок выполнения goroutine является случайным, вы получите случайный порядок resp. Короче говоря, это один из тех случаев, когда вы должны выполнять код последовательно.

...