Неопределенное поведение при загрузке большого CSV одновременно с использованием Goroutines - PullRequest
0 голосов
/ 02 ноября 2018

Я пытаюсь загрузить большой CSV-файл, используя goroutines, используя Golang. Размерность CSV составляет (254882, 100). Но, используя мои программы, когда я анализирую CSV и сохраняю его в 2D-списке, я получаю строки меньше, чем 254882, и число меняется для каждого запуска. Я чувствую, что это происходит из-за рутин, но не могу указать причину. Кто-нибудь может мне помочь, пожалуйста. Я также новичок в Голанге. Вот мой код ниже

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

А моя основная функция выглядит следующим образом

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}

Если кому-то тоже нужно скачать CSV, перейдите по ссылке ниже (485 МБ) https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing

Ответы [ 2 ]

0 голосов
/ 02 ноября 2018

Нет необходимости использовать *[][]float64, поскольку это будет двойной указатель.

Я сделал несколько небольших изменений в вашей программе.

dataset доступно для новой программы, поскольку она объявлена ​​в блоке кода выше.

Аналогично record также доступен, но поскольку переменная record время от времени меняется, нам нужно передать ее новой программе.

Хотя нет необходимости передавать dataset, так как он не меняется и это то, что мы хотим, чтобы мы могли добавить temp к dataset.

Но состояние гонки происходит, когда несколько goroutines пытаются добавить одну и ту же переменную, т.е. несколько goroutines пытаются записать в одну и ту же переменную.

Таким образом, мы должны убедиться, что в каждый момент времени может быть добавлена ​​только одна программа. Поэтому мы используем блокировку для последовательного добавления.

package main

import (
    "bufio"
    "encoding/csv"
    "fmt"
    "os"
    "strconv"
    "sync"
)

func loadCSV(csvFile string) [][]float64 {
    var dataset [][]float64

    f, _ := os.Open(csvFile)

    r := csv.NewReader(f)

    var wg sync.WaitGroup
    l := new(sync.Mutex) // lock

    for record, err := r.Read(); err == nil; record, err = r.Read() {
        wg.Add(1)

        go func(record []string) {
            defer wg.Done()

            var temp []float64
            for _, each := range record {
                if f, err := strconv.ParseFloat(each, 64); err == nil {
                    temp = append(temp, f)
                }
            }
            l.Lock() // lock before writing
            dataset = append(dataset, temp) // write
            l.Unlock() // unlock

        }(record)
    }

    wg.Wait()

    return dataset
}

func main() {
    dataset := loadCSV("train.csv")
    fmt.Println(len(dataset))
}

Некоторые ошибки не были обработаны, чтобы сделать его минимальным, но вы должны обрабатывать ошибки.

0 голосов
/ 02 ноября 2018

Go Data Race Detector


Ваши результаты не определены, потому что у вас есть данные гонки.

~/gopath/src$ go run -race racer.go
==================
WARNING: DATA RACE
Write at 0x00c00008a060 by goroutine 6:
  runtime.mapassign_faststr()
      /home/peter/go/src/runtime/map_faststr.go:202 +0x0
  main.main.func2()
      /home/peter/gopath/src/racer.go:16 +0x6a

Previous write at 0x00c00008a060 by goroutine 5:
  runtime.mapassign_faststr()
      /home/peter/go/src/runtime/map_faststr.go:202 +0x0
  main.main.func1()
      /home/peter/gopath/src/racer.go:11 +0x6a

Goroutine 6 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:14 +0x88

Goroutine 5 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:9 +0x5b
==================
fatal error: concurrent map writes
==================
WARNING: DATA RACE
Write at 0x00c00009a088 by goroutine 6:
  main.main.func2()
      /home/peter/gopath/src/racer.go:16 +0x7f

Previous write at 0x00c00009a088 by goroutine 5:
  main.main.func1()
      /home/peter/gopath/src/racer.go:11 +0x7f

Goroutine 6 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:14 +0x88

Goroutine 5 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:9 +0x5b
==================

goroutine 34 [running]:
runtime.throw(0x49e156, 0x15)
    /home/peter/go/src/runtime/panic.go:608 +0x72 fp=0xc000094718 sp=0xc0000946e8 pc=0x44b342
runtime.mapassign_faststr(0x48ace0, 0xc00008a060, 0x49c9c3, 0x8, 0xc00009a088)
    /home/peter/go/src/runtime/map_faststr.go:211 +0x46c fp=0xc000094790 sp=0xc000094718 pc=0x43598c
main.main.func1(0x49c9c3, 0x8)
    /home/peter/gopath/src/racer.go:11 +0x6b fp=0xc0000947d0 sp=0xc000094790 pc=0x47ac6b
runtime.goexit()
    /home/peter/go/src/runtime/asm_amd64.s:1340 +0x1 fp=0xc0000947d8 sp=0xc0000947d0 pc=0x473061
created by main.main
    /home/peter/gopath/src/racer.go:9 +0x5c

goroutine 1 [sleep]:
time.Sleep(0x5f5e100)
    /home/peter/go/src/runtime/time.go:105 +0x14a
main.main()
    /home/peter/gopath/src/racer.go:19 +0x96

goroutine 35 [runnable]:
main.main.func2(0x49c9c3, 0x8)
    /home/peter/gopath/src/racer.go:16 +0x6b
created by main.main
    /home/peter/gopath/src/racer.go:14 +0x89
exit status 2
~/gopath/src$ 

racer.go:

package main

import (
    "bufio"
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "os"
    "strconv"
    "sync"
    "time"
)

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("/home/peter/AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}
...