Гонка останавливает группу горутин - PullRequest
0 голосов
/ 09 мая 2018

У меня куча горутинов, которые делают что-то в цикле. Я хочу иметь возможность приостановить все из них, запустить произвольный код, а затем возобновить их. Способ, которым я пытался сделать это, вероятно, не идиоматичен (и я был бы признателен за лучшее решение), но я не могу понять, почему оно не работает.

Разобраться до самого необходимого (код драйвера внизу):

type looper struct {
    pause  chan struct{}
    paused sync.WaitGroup
    resume chan struct{}
}

func (l *looper) loop() {
    for {
        select {
        case <-l.pause:
            l.paused.Done()
            <-l.resume
        default:
            dostuff()
        }
    }
}

func (l *looper) whilePaused(fn func()) {
    l.paused.Add(32)
    l.resume = make(chan struct{})
    close(l.pause)
    l.paused.Wait()
    fn()
    l.pause = make(chan struct{})
    close(l.resume)
}

Я запускаю 32 горутина, все бегут loop(), затем звоню whilePaused 100 раз подряд, и все, кажется, работает ... но если я запускаю это с -race, это говорит мне, что на * есть гонка l.resume между записью в whilePaused (l.resume = make(chan struct{})) и чтением в loop (<-l.resume).

Я не понимаю, почему это происходит. Согласно Модель памяти Go , что close(l.pause) должно происходить до <-l.pause в каждой loop программе. Это должно означать, что значение make(chan struct{}) отображается как значение l.resume во всех этих loop процедурах, точно так же, как строка "hello world" отображается как значение a в f. в примере с документами.


Некоторая дополнительная информация, которая может иметь отношение к теме:

  • Если я заменим l.resume на unsafe.Pointer и получу доступ к значению chan struct{} с atomic.LoadPointer в loop и atomic.StorePointer в whilePaused, гонка уйдет. Похоже, это обеспечивает точно такой же порядок получения-выпуска, который канал уже должен предоставить?

  • Если я добавлю time.Sleep(10 * time.Microsecond) между l.paused.Done() и <-l.resume, программа обычно блокируется после вызова fn один или два раза.

  • Если вместо этого добавить fmt.Printf("."), программа напечатает 28 . с, вызовет первую функцию, напечатает еще 32 . с, затем зависнет (или, иногда, вызывает вторую функцию, затем печатает еще 32 . с и зависает).


Вот остальная часть моего кода, на случай, если вы захотите запустить все это:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// looper code from above

var n int64    
func dostuff() {
    atomic.AddInt64(&n, 1)
}

func main() {
    l := &looper{
        pause: make(chan struct{}),
    }
    var init sync.WaitGroup
    init.Add(32)
    for i := 0; i < 32; i++ {
        go func() {
            init.Done()
            l.loop()
        }()
    }
    init.Wait()
    for i := 0; i < 100; i++ {
        l.whilePaused(func() { fmt.Printf("%d ", i) })
    }
    fmt.Printf("\n%d\n", atomic.LoadInt64(&n))
}

1 Ответ

0 голосов
/ 09 мая 2018

Это потому, что после того, как поток выполнит l.paused.Done (), другой поток сможет обойти цикл и снова назначить l.resume

Вот последовательность операций

Looper thread    |    Pauser thread
------------------------------------
l.paused.Done()  |   
                 |   l.paused.Wait()
                 |   l.pause = make(chan struct{})
                 |   round the loop
                 |   l.paused.Add(numThreads)
<- l.resume      |   l.resume = make(chan struct{})   !!!RACE!!
...