Учитывая, что у вас есть несколько писателей на одном канале, у вас есть небольшая проблема, потому что простой способ сделать это в Go в целом - это иметь одного писателя на одном канале, а затем иметь этот единственныйписатель закрывает канал после отправки последнего значения:
func produce(... args including channel) {
defer close(ch)
for stuff_to_produce {
ch <- item
}
}
Этот шаблон обладает хорошим свойством, что независимо от того, как вы выходите из produce
, канал закрывается, сигнализируя об окончании производства.
Вы не используете этот шаблон - вы доставляете один канал многим подпрограммам, каждая из которых может отправить одно сообщение - поэтому вам нужно переместить close
(или, конечно, использоватьеще какой-то другой паттерн). Простейший способ выразить нужный вам шаблон - это:
func overall_produce(... args including channel ...) {
var pg sync.WaitGroup
defer close(ch)
for stuff_to_produce {
pg.Add(1)
go produceInParallel(ch, &pg) // add more args if appropriate
}
pg.Wait()
}
Счетчик pg
накапливает активных производителей. Каждый должен позвонить pg.Done()
, чтобы указать, что это сделано с помощью ch
. Общий продюсер теперь ждет, когда все они будут выполнены, затем it закрывает канал на своем выходе.
(Если вы пишете внутреннюю функцию produceInParallel
как замыкание, вы неВам не нужно явно передавать ch
и pg
. Вы также можете написать overallProducer
как замыкание.)
Обратите внимание, что цикл вашего единственного потребителя, вероятно, лучше всего выражается с помощью конструкции for ... range
:
func receive(msg <-chan message, wg *sync.WaitGroup) {
for m := range msg {
fmt.Println("Received:", m)
}
wg.Done()
}
(Вы упомянули намерение добавить select
в цикл, чтобы вы могли выполнять другие вычисления, если сообщение еще не готово. Если этот код не может быть выделен в независимые программына самом деле вам понадобится более причудливая конструкция m, ok := <-msg
.
Обратите также внимание, что wg
для receive
- который может оказаться ненужным, в зависимости от того, как вы структурируете другие вещи, - довольнонезависимо от группы ожидания pg
для производителей. Несмотря на то, что, как написано, потребитель не может сделать это до тех пор, пока не будут завершены все производители, мы хотели бы подождать, пока производители не будут готовы, чтобы мы могли закрыть канал в оболочке общего производителя.