Чтение с небуферизованных каналов - PullRequest
0 голосов
/ 24 марта 2019

Я пытаюсь понять небуферизованные каналы, поэтому я написал небольшое приложение, которое перебирает массив пользовательского ввода, выполняет некоторую работу, помещает информацию в небуферизованный канал и затем читает его. Однако я не могу читать с каналов. Это мой код

toProcess := os.Args[1:]

var wg sync.WaitGroup
results := make(chan string)
errs := make(chan error)

for _, t := range toProcess {
    wg.Add(1)
    go Worker(t, "text", results, errs, &wg)
}


go func() {
    for err := range errs {
        if err != nil {
            fmt.Println(err)
        }
    }
}()


go func() {
    for res := range results {
        fmt.Println(res)
    }
}()

Что я не понимаю о не буферизованных каналах? Я подумал, что мне следует разместить на нем информацию и сделать еще одно обычное чтение.

РЕДАКТИРОВАТЬ: использование двух программ решает проблемы, но все равно дает мне следующее при ошибках:

open /Users/roosingh/go/src/github.com/nonbuff/files/22.txt: no such file or directory
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc42001416c)
    /usr/local/Cellar/go/1.10.2/libexec/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc420014160)
    /usr/local/Cellar/go/1.10.2/libexec/src/sync/waitgroup.go:129 +0x72
main.main()
    /Users/roosingh/go/src/github.com/nonbuff/main.go:39 +0x207

goroutine 6 [chan receive]:
main.main.func1(0xc4200780c0)
    /Users/roosingh/go/src/github.com/nonbuff/main.go:25 +0x41
created by main.main
    /Users/roosingh/go/src/github.com/nonbuff/main.go:24 +0x1d4

goroutine 7 [chan receive]:
main.main.func2(0xc420078060)
    /Users/roosingh/go/src/github.com/nonbuff/main.go:34 +0xb2
created by main.main
    /Users/roosingh/go/src/github.com/nonbuff/main.go:33 +0x1f6

Таким образом, он может распечатать сообщение об ошибке. Мой рабочий код выглядит следующим образом:

func Worker(fn string, text string, results chan string, errs chan error, wg *sync.WaitGroup) {
    file, err := os.Open(fn)
    if err != nil {
        errs <- err
        return
    }
    defer func() {
        file.Close()
        wg.Done()
    }()

    reader := bufio.NewReader(file)


    for {
        var buffer bytes.Buffer

        var l []byte
        var isPrefix bool
        for {
            l, isPrefix, err = reader.ReadLine()
            buffer.Write(l)

            if !isPrefix {
                break
            }

            if err != nil {
                errs <- err
                return

            }
        }

        if err == io.EOF {
            return
        }

        line := buffer.String()

        results <- fmt. Sprintf("%s, %s", line, text)

    }

    if err != io.EOF {
        errs <- err
        return
    }

    return
}

1 Ответ

4 голосов
/ 24 марта 2019

Что касается небуферизованных каналов, вы, кажется, понимаете концепцию, то есть она используется для передачи сообщений между программами, но не может их содержать. Поэтому запись в небуферизованном канале будет блокироваться до тех пор, пока другая программа не будет считывать данные с канала, а чтение из канала будет блокироваться до тех пор, пока другая программа не выполнит запись в этот канал.

В вашем случае вы, кажется, хотите читать с 2 каналов одновременно в одной и той же программе. Так как каналы работают, вы не можете располагаться на незакрытом канале и дальше в том же диапазоне подпрограмм на другом канале. Пока первый канал не закроется, вы не достигнете второго диапазона.

Но это не значит, что это невозможно! Вот где приходит оператор select.

Оператор select позволяет выборочно читать по нескольким каналам, что означает, что он будет читать первый, который имеет что-то доступное для чтения.

Имея это в виду, вы можете использовать for в сочетании с select и переписать свою программу следующим образом:

go func() {
    for {
        select {
            case err := <- errs: // you got an error
                fmt.Println(err) 
            case res := <- results: // you got a result
                fmt.Println(res)
        }
    }
}()

Кроме того, вам здесь не нужна группа ожидания, потому что вы знаете, сколько рабочих вы запускаете, вы можете просто посчитать, сколько ошибок и результатов вы получите и остановить, когда достигнете числа рабочих.

Пример:

go func() {
    var i int
    for {
        select {
            case err := <- errs: // you got an error
                fmt.Println(err)
                i++
            case res := <- results: // you got a result
                fmt.Println(res)
                i++
        }
        // all our workers are done
        if i == len(toProcess) {
            return 
        }
    }
}()
...