Go канал не работает для образца производителя / потребителя - PullRequest
0 голосов
/ 26 января 2019

Я только что установил Go на Mac, и вот код

package main

import (
    "fmt"
    "time"
)

func Product(ch chan<- int) {
    for i := 0; i < 100; i++ {
        fmt.Println("Product:", i)
        ch <- i
    }
}

func Consumer(ch <-chan int) {
    for i := 0; i < 100; i++ {
        a := <-ch
        fmt.Println("Consmuer:", a)
    }
}

func main() {
    ch := make(chan int, 1)
    go Product(ch)
    go Consumer(ch)
    time.Sleep(500)
}

Я "иди и запусти продюсера_consumer.go", на экране нет вывода, а затем он завершается.

Есть проблемы с моей программой? Как это исправить?

Ответы [ 3 ]

0 голосов
/ 26 января 2019

Я немного изменил (расширил время. Сплю) твой код.Прекрасно работает на моем Linux x86_64

func Product(ch chan<- int) {
    for i := 0; i < 10; i++ {
        fmt.Println("Product:", i)
        ch <- i
    }
}
func Consumer(ch <-chan int) {
    for i := 0; i < 10; i++ {
        a := <-ch
        fmt.Println("Consmuer:", a)
    }
}
func main() {
    ch := make(chan int, 1)
    go Product(ch)
    go Consumer(ch)
    time.Sleep(10000)
}

Вывод go run s1.go

Product: 0
Product: 1
Product: 2
0 голосов
/ 26 января 2019

Это довольно многословный ответ, но, проще говоря:

  • Использование time.Sleep для ожидания, пока , надеюсь, другие подпрограммы завершат свою работу, плохо.
  • Потребитель и производитель не должны ничего знать друг о друге, кроме типа, которым они обмениваются по каналу.Ваш код полагается как на потребителя, так и на производителя, зная, сколько целых будет передано.Нереалистичный сценарий
  • Каналы можно перебирать (представьте их как потокобезопасный общий раздел)
  • каналы должны быть закрыты

Внизуиз этого довольно многословного ответа, где я попытаюсь объяснить некоторые базовые концепции и лучшие практики (ну, лучшие практики), вы обнаружите, что ваш код переписан для работы и отображают все значения без опираясь на time.Sleep.Я не проверял этот код, но все должно быть в порядке


Да, здесь есть пара проблем.Так же, как в списке:

  1. Ваш канал буферизуется на 1, что нормально, но в этом нет необходимости
  2. Ваш канал никогда не закрывается
  3. You 'Вы ожидаете 500 нс, затем завершаете работу независимо от того, завершились ли процедуры или даже начали обработку в этом отношении.
  4. Централизованный контроль над подпрограммами отсутствует, после того, как вы их запустили, вы получаете 0 элементов управления.Если вы нажмете Ctrl + C, вы можете отменить подпрограммы при написании кода, который будет обрабатывать важные данные.Проверьте обработку сигнала и контекст для этого

Буфер канала

Поскольку вы уже знаете, сколько значений вы собираетесь выдвинуть на свой канал, почему бы просто не создать ch := make(chan int, 100)?Таким образом, ваш издатель может продолжать отправлять сообщения на канал, независимо от того, что делает потребитель.

Вам не нужно , чтобы сделать это, , но добавьте разумный буфер в ваш канал,в зависимости от того, что вы пытаетесь сделать, определенно стоит проверить.На данный момент, однако, обе процедуры используют fmt.Println & co, что в любом случае будет узким местом.Печать в STDOUT является поточно-ориентированной и буферизированной.Это означает, что каждый вызов fmt.Print* собирается получить блокировку, чтобы избежать объединения текста из обеих подпрограмм.

Закрытие канала

Вы можете просто вставить все значения в свойканал, а затем закройте его.Это, однако, плохая форма.Практическое правило WRT-каналов заключается в том, что каналы создаются и закрываются в одной и той же процедуре.Значение: вы создаете канал в основной подпрограмме, именно там он должен быть закрыт.

Вам необходим механизм для синхронизации или, по крайней мере, отслеживание того, выполнили ли ваши подпрограммы свою работу.Это делается с помощью пакета sync или через второй канал.

// using a done channel
func produce(ch chan<- int) <-chan struct{} {
    done := make(chan struct{})
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        // all values have been published
        // close done channel
        close(done)
    }()
    return done
}

func main() {
    ch := make(chan int, 1)
    done := produce(ch)
    go consume(ch)
    <-done // if producer has done its thing
    close(ch) // we can close the channel
}

func consume(ch <-chan int) {
    // we can now simply loop over the channel until it's closed
    for i := range ch {
        fmt.Printf("Consumed %d\n", i)
    }
}

ОК, но здесь вам все равно придется ждать завершения процедуры consume.

Возможно, вы уже заметили, что канал done технически не закрыт в той же подпрограмме, которая его создает.Поскольку процедура определяется как замыкание, это приемлемый компромисс.Теперь давайте посмотрим, как мы могли бы использовать группу ожидания:

import (
    "fmt"
    "sync"
)

func product(wg *sync.WaitGroup, ch chan<- int) {
    defer wg.Done() // signal we've done our job
    for i := 0; i < 100; i++ {
        ch <- i
    }
}

func main() {
    ch := make(chan int, 1)
    wg := sync.WaitGroup{}
    wg.Add(1) // I'm adding a routine to the channel
    go produce(&wg, ch)
    wg.Wait() // will return once `produce` has finished
    close(ch)
}

ОК, так что это выглядит многообещающе, я могу попросить подпрограммы сообщить мне, когда они закончили свои задачи.Но если я добавлю и потребителя, и производителя в группу ожидания, я не смогу просто перебрать канал.Канал закроется только в том случае, если обе подпрограммы вызовут wg.Done(), но если потребитель застрянет в цикле по каналу, который никогда не закроется, я создал тупик.

Решение:

Гибрид был бы самым простым решением на этом этапе: добавьте потребителя в группу ожидания и используйте готовый канал в производителе, чтобы получить:

func produce(ch chan<- int) <-chan struct{} {
    done := make(chan struct{})
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        close(done)
    }()
    return done
}

func consume(wg *sync.WaitGroup, ch <-chan int) {
    defer wg.Done()
    for i := range ch {
        fmt.Printf("Consumer: %d\n", i)
    }
}

func main() {
    ch := make(chan int, 1)
    wg := sync.WaitGroup{}
    done := produce(ch)
    wg.Add(1)
    go consume(&wg, ch)
    <- done // produce done
    close(ch)
    wg.Wait()
    // consumer done
    fmt.Println("All done, exit")
}
0 голосов
/ 26 января 2019

Как намекнул JimB, time.Sleep принимает time.Duration, а не целое число. godoc показывает пример того, как это правильно вызвать. В вашем случае вы, вероятно, хотите:

time.Sleep(500 * time.Millisecond)

Причина, по которой ваша программа быстро завершает работу (но не выдает ошибку), связана с (несколько неожиданным) способом реализации time.Duration.

time.Duration - это просто псевдоним типа для int64. Внутренне он использует значение для представления длительности в наносекундах. Когда вы вызываете time.Sleep(500), компилятор с радостью интерпретирует числовой литерал 500 как time.Duration. К сожалению, , что означает 500 нс .

time.Millisecond - это константа, равная числу наносекунд в миллисекунде (1 000 000). Хорошая вещь заключается в том, что требование сделать это умножение в явном виде делает очевидным для вызывающей стороны, каковы единицы этого аргумента. К сожалению, time.Sleep(500) - это совершенно правильный код go, но он не выполняет то, что ожидают большинство новичков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...