Несколько производителей, один потребитель: все рутины спят - тупик - PullRequest
1 голос
/ 11 ноября 2019

Я следовал схеме проверки, есть ли что-нибудь в канале, прежде чем приступить к работе:

func consume(msg <-chan message) {
  for {
    if m, ok := <-msg; ok {
      fmt.Println("More messages:", m)
    } else {
      break
    }
  }
}

, которая основана на этом видео . Вот мой полный код:

package main

import (
    "fmt"
    "strconv"
    "strings"
    "sync"
)

type message struct {
    body string
    code int
}

var markets []string = []string{"BTC", "ETH", "LTC"}

// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
    // for i := 0; i < n; i++ {
    var msgToSend = message{
        body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
        code: 1,
    }
    fmt.Println("Producing:", msgToSend)
    msg <- msgToSend
    // }
    wg.Done()
}

func receive(msg <-chan message, wg *sync.WaitGroup) {
    for {
        if m, ok := <-msg; ok {
            fmt.Println("Received:", m)
        } else {
            fmt.Println("Breaking from receiving")
            break
        }
    }
    wg.Done()
}

func main() {
    wg := sync.WaitGroup{}
    msgC := make(chan message, 100)
    defer func() {
        close(msgC)
    }()
    for ix, market := range markets {
        wg.Add(1)
        go produce(ix+1, market, msgC, &wg)
    }
    wg.Add(1)
    go receive(msgC, &wg)
    wg.Wait()
}

Если вы попытаетесь запустить его, мы получим тупик в самом конце, прежде чем напечатать сообщение, которое мы собираемся сломать. Это имеет смысл, так как в прошлый раз, когда в канале больше ничего нет, мы пытаемся извлечь значение, и мы получаем эту ошибку. Но тогда этот шаблон не работает if m, ok := <- msg; ok. Как заставить этот код работать и почему я получаю эту ошибку взаимоблокировки (предположительно, этот шаблон должен работать?).

Ответы [ 3 ]

1 голос
/ 11 ноября 2019

Учитывая, что у вас есть несколько писателей на одном канале, у вас есть небольшая проблема, потому что простой способ сделать это в Go в целом - это иметь одного писателя на одном канале, а затем иметь этот единственныйписатель закрывает канал после отправки последнего значения:

func produce(... args including channel) {
    defer close(ch)
    for stuff_to_produce {
        ch <- item
    }
}

Этот шаблон обладает хорошим свойством, что независимо от того, как вы выходите из produce, канал закрывается, сигнализируя об окончании производства.

Вы не используете этот шаблон - вы доставляете один канал многим подпрограммам, каждая из которых может отправить одно сообщение - поэтому вам нужно переместить close (или, конечно, использоватьеще какой-то другой паттерн). Простейший способ выразить нужный вам шаблон - это:

func overall_produce(... args including channel ...) {
    var pg sync.WaitGroup
    defer close(ch)
    for stuff_to_produce {
        pg.Add(1)
        go produceInParallel(ch, &pg) // add more args if appropriate
    }
    pg.Wait()
}

Счетчик pg накапливает активных производителей. Каждый должен позвонить pg.Done(), чтобы указать, что это сделано с помощью ch. Общий продюсер теперь ждет, когда все они будут выполнены, затем it закрывает канал на своем выходе.

(Если вы пишете внутреннюю функцию produceInParallel как замыкание, вы неВам не нужно явно передавать ch и pg. Вы также можете написать overallProducer как замыкание.)

Обратите внимание, что цикл вашего единственного потребителя, вероятно, лучше всего выражается с помощью конструкции for ... range:

func receive(msg <-chan message, wg *sync.WaitGroup) {
    for m := range msg {
        fmt.Println("Received:", m)
    }
    wg.Done()
}

(Вы упомянули намерение добавить select в цикл, чтобы вы могли выполнять другие вычисления, если сообщение еще не готово. Если этот код не может быть выделен в независимые программына самом деле вам понадобится более причудливая конструкция m, ok := <-msg.

Обратите также внимание, что wg для receive - который может оказаться ненужным, в зависимости от того, как вы структурируете другие вещи, - довольнонезависимо от группы ожидания pg для производителей. Несмотря на то, что, как написано, потребитель не может сделать это до тех пор, пока не будут завершены все производители, мы хотели бы подождать, пока производители не будут готовы, чтобы мы могли закрыть канал в оболочке общего производителя.

0 голосов
/ 11 ноября 2019

Только когда возвращается main, вы можете close(msgC), но пока receive ожидает сигнала close, поэтому возникает DeadLock. После создания сообщений закройте канал.

package main

import (
    "fmt"
    "strconv"
    "strings"
    "sync"
)

type message struct {
    body string
    code int
}

var markets []string = []string{"BTC", "ETH", "LTC"}

// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
    // for i := 0; i < n; i++ {
    var msgToSend = message{
        body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
        code: 1,
    }
    fmt.Println("Producing:", msgToSend)
    msg <- msgToSend
    // }
    wg.Done()
}

func receive(msg <-chan message, wg *sync.WaitGroup) {
    for {
        if m, ok := <-msg; ok {
            fmt.Println("Received:", m)
        } else {
            fmt.Println("Breaking from receiving")
            break
        }
    }
    wg.Done()
}

func main() {
    wg := sync.WaitGroup{}
    msgC := make(chan message, 100)
    // defer func() {
    //  close(msgC)
    // }()
    for ix, market := range markets {
        wg.Add(1)
        go produce(ix+1, market, msgC, &wg)
    }
    wg.Wait() // wait for producer
    close(msgC)
    wg.Add(1)
    go receive(msgC, &wg)
    wg.Wait()
}
0 голосов
/ 11 ноября 2019

Попробуйте этот код, я сделал несколько исправлений, которые заставили его работать:

package main

import (
    "fmt"
    "strconv"
    "strings"
    "sync"
)

type message struct {
    body string
    code int
}

var markets []string = []string{"BTC", "ETH", "LTC"}

// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
    // for i := 0; i < n; i++ {
    var msgToSend = message{
        body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
        code: 1,
    }
    fmt.Println("Producing:", msgToSend)
    msg <- msgToSend
    // }

}

func receive(msg <-chan message, wg *sync.WaitGroup) {
    for {
        if m, ok := <-msg; ok {
            fmt.Println("Received:", m)
            wg.Done()
        }
    }
}

func consume(msg <-chan message) {
  for {
    if m, ok := <-msg; ok {
      fmt.Println("More messages:", m)
    } else {
      break
    }
  }
}

func main() {
    wg := sync.WaitGroup{}
    msgC := make(chan message, 100)
    defer func() {
        close(msgC)
    }()
    for ix, market := range markets {
        wg.Add(1)
        go produce(ix+1, market, msgC, &wg)
    }

    go receive(msgC, &wg)
    wg.Wait()
    fmt.Println("Breaking from receiving")
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...