У меня куча горутинов, которые делают что-то в цикле. Я хочу иметь возможность приостановить все из них, запустить произвольный код, а затем возобновить их. Способ, которым я пытался сделать это, вероятно, не идиоматичен (и я был бы признателен за лучшее решение), но я не могу понять, почему оно не работает.
Разобраться до самого необходимого (код драйвера внизу):
type looper struct {
pause chan struct{}
paused sync.WaitGroup
resume chan struct{}
}
func (l *looper) loop() {
for {
select {
case <-l.pause:
l.paused.Done()
<-l.resume
default:
dostuff()
}
}
}
func (l *looper) whilePaused(fn func()) {
l.paused.Add(32)
l.resume = make(chan struct{})
close(l.pause)
l.paused.Wait()
fn()
l.pause = make(chan struct{})
close(l.resume)
}
Я запускаю 32 горутина, все бегут loop()
, затем звоню whilePaused
100 раз подряд, и все, кажется, работает ... но если я запускаю это с -race
, это говорит мне, что на * есть гонка l.resume
между записью в whilePaused
(l.resume = make(chan struct{})
) и чтением в loop
(<-l.resume
).
Я не понимаю, почему это происходит. Согласно Модель памяти Go , что close(l.pause)
должно происходить до <-l.pause
в каждой loop
программе. Это должно означать, что значение make(chan struct{})
отображается как значение l.resume
во всех этих loop
процедурах, точно так же, как строка "hello world"
отображается как значение a
в f
. в примере с документами.
Некоторая дополнительная информация, которая может иметь отношение к теме:
Если я заменим l.resume
на unsafe.Pointer
и получу доступ к значению chan struct{}
с atomic.LoadPointer
в loop
и atomic.StorePointer
в whilePaused
, гонка уйдет. Похоже, это обеспечивает точно такой же порядок получения-выпуска, который канал уже должен предоставить?
Если я добавлю time.Sleep(10 * time.Microsecond)
между l.paused.Done()
и <-l.resume
, программа обычно блокируется после вызова fn
один или два раза.
Если вместо этого добавить fmt.Printf(".")
, программа напечатает 28 .
с, вызовет первую функцию, напечатает еще 32 .
с, затем зависнет (или, иногда, вызывает вторую функцию, затем печатает еще 32 .
с и зависает).
Вот остальная часть моего кода, на случай, если вы захотите запустить все это:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// looper code from above
var n int64
func dostuff() {
atomic.AddInt64(&n, 1)
}
func main() {
l := &looper{
pause: make(chan struct{}),
}
var init sync.WaitGroup
init.Add(32)
for i := 0; i < 32; i++ {
go func() {
init.Done()
l.loop()
}()
}
init.Wait()
for i := 0; i < 100; i++ {
l.whilePaused(func() { fmt.Printf("%d ", i) })
}
fmt.Printf("\n%d\n", atomic.LoadInt64(&n))
}