Golang шаблон, чтобы убить несколько goroutines одновременно - PullRequest
0 голосов
/ 03 апреля 2020

У меня есть две программы, как показано ниже. Я хочу синхронизировать их так, чтобы, когда один возвращается, другой также должен был выйти. Каков наилучший способ для достижения этой цели в go?

func main() {

  go func() {
    ...
    if err != nil {
      return
    }
  }()

  go func() {
    ...
    if err != nil {
      return
    }
  }()


}

Я смоделировал этот сценарий здесь https://play.golang.org/p/IqawStXt7rt и попытался решить его с помощью канала, сигнализирующего о выполнении процедуры. , Похоже, что запись в закрытый канал может привести к пани c. Как лучше всего решить эту проблему?

Ответы [ 5 ]

3 голосов
/ 03 апреля 2020

Вы можете использовать контекст для связи между двумя go подпрограммами. Например,

package main

import (
    "context"
    "sync"
)

func main() {

    ctx, cancel := context.WithCancel(context.Background())
    wg := sync.WaitGroup{}
    wg.Add(3)
    go func() {
        defer wg.Done()
        for {
            select {
            // msg from other goroutine finish
            case <-ctx.Done():
                // end
            }
        }
    }()

    go func() {
        defer wg.Done()
        for {
            select {
            // msg from other goroutine finish
            case <-ctx.Done():
                // end
            }
        }
    }()

    go func() {
        defer wg.Done()
        // your operation
        // call cancel when this goroutine ends
        cancel()
    }()
    wg.Wait()
}

1 голос
/ 05 апреля 2020

Сначала разделите ожидание на go -программах и done -канале.

Используйте sync.WaitGroup для координации подпрограмм.

func main() {
    wait := &sync.WaitGroup{}
    N := 3

    wait.Add(N)
    for i := 1; i <= N; i++ {
        go goFunc(wait, i, true)
    }

    wait.Wait()
    fmt.Println(`Exiting main`)
}

Каждая подпрограмма будет выглядеть следующим образом это:

// code for the actual goroutine
func goFunc(wait *sync.WaitGroup, i int, closer bool) {
    defer wait.Done()
    defer fmt.Println(`Exiting `, i)

    T := time.Tick(time.Duration(100*i) * time.Millisecond)
    for {
        select {
        case <-T:
            fmt.Println(`Tick `, i)
            if closer {
                return
            }
        }
    }
}

(https://play.golang.org/p/mDO4P56lzBU)

Наше основное веселье c - это успешное ожидание выхода из программы перед выходом. Каждая программа закрывается сама, и мы хотим, чтобы все наши программы были отменены одновременно.

Мы сделаем это с chan и воспользуемся этой функцией получения из каналов:

QUOTE: Операция приема на закрытом канале всегда может быть выполнена немедленно, давая нулевое значение типа элемента после получения любых ранее отправленных значений. (https://golang.org/ref/spec#Receive_operator)

Мы изменяем наши процедуры для проверки на ЗАКРЫТИЕ:

func goFunc(wait *sync.WaitGroup, i int, closer bool, CLOSE chan struct{}) {
    defer wait.Done()
    defer fmt.Println(`Exiting `, i)

    T := time.Tick(time.Duration(100*i) * time.Millisecond)
    for {
        select {
        case <-CLOSE:
            return
        case <-T:
            fmt.Println(`Tick `, i)
            if closer {
                close(CLOSE)
            }
        }
    }
}

, а затем мы изменяем наш func main, чтобы он проходил ЗАКРЫТЬ через канал, и мы установим переменную closer так, чтобы только последняя из наших программ запустила закрытие:

func main() {
    wait := &sync.WaitGroup{}
    N := 3
    CLOSE := make(chan struct{})

    // Launch the goroutines
    wait.Add(N)
    for i := 1; i <= N; i++ {
        go goFunc(wait, i, i == N, CLOSE)
    }

    // Wait for the goroutines to finish
    wait.Wait()
    fmt.Println(`Exiting main`)
}

(https://play.golang.org/p/E91CtRAHDp2)

Теперь похоже, что все работает.

Но это не так. Параллельность сложна. В этом коде скрывается ошибка, просто ожидающая, чтобы укусить вас в работе. Давайте рассмотрим это.

Измените наш пример так, чтобы каждая процедура закрывалась:

func main() {
    wait := &sync.WaitGroup{}
    N := 3
    CLOSE := make(chan struct{})

    // Launch the goroutines
    wait.Add(N)
    for i := 1; i <= N; i++ {
        go goFunc(wait, i, true /*** EVERY GOROUTINE WILL CLOSE ***/, CLOSE)
    }

    // Wait for the goroutines to finish
    wait.Wait()
    fmt.Println(`Exiting main`)
}

Измените процедуру так, чтобы до закрытия потребовалось некоторое время. Мы хотим, чтобы две программы закрывались одновременно:

// code for the actual goroutine
func goFunc(wait *sync.WaitGroup, i int, closer bool, CLOSE chan struct{}) {
    defer wait.Done()
    defer fmt.Println(`Exiting `, i)

    T := time.Tick(time.Duration(100*i) * time.Millisecond)
    for {
        select {
        case <-CLOSE:
            return
        case <-T:
            fmt.Println(`Tick `, i)
            if closer {
                /*** TAKE A WHILE BEFORE CLOSING ***/
                time.Sleep(time.Second)
                close(CLOSE)
            }
        }
    }
}


(https://play.golang.org/p/YHnbDpnJCks)

Мы производим sh с:

Tick  1
Tick  2
Tick  3
Exiting  1
Exiting  2
panic: close of closed channel

goroutine 7 [running]:
main.goFunc(0x40e020, 0x2, 0x68601, 0x430080)
    /tmp/sandbox558886627/prog.go:24 +0x2e0
created by main.main
    /tmp/sandbox558886627/prog.go:38 +0xc0

Program exited: status 2.

Хотя прием по закрытому каналу немедленно возвращается, вы не можете закрыть закрытый канал.

Нам нужно немного координации. Мы можем сделать это с помощью sync.Mutex и bool, чтобы указать, закрыли ли мы канал или нет. Давайте создадим структуру, чтобы сделать это:

type Close struct {
    C chan struct{}
    l sync.Mutex
    closed bool
}

func NewClose() *Close {
    return &Close {
        C: make(chan struct{}),
    }
}

func (c *Close) Close() {
    c.l.Lock()
    if (!c.closed) {
        c.closed=true
        close(c.C)
    }
    c.l.Unlock()
}

Перепишите наш gofun c и наше основное, чтобы использовать нашу новую структуру Close, и мы готовы к go: https://play.golang.org/p/eH3djHu8EXW

Проблема с параллелизмом заключается в том, что вам всегда нужно задаться вопросом, что произойдет, если в коде есть еще один «поток».

1 голос
/ 03 апреля 2020

Используйте закрытие на канале, чтобы сигнализировать о завершении. Это позволяет нескольким процедурам проверять выполнение, получая по каналу.

Использовать один канал для каждой процедуры, чтобы сообщить о завершении процедуры.

done1 := make(chan struct{}) // closed when goroutine 1 returns
done2 := make(chan struct{}) // closed when goroutine 2 returns

go func() {
    defer close(done1)

    timer1 := time.NewTicker(1 * time.Second)
    defer timer1.Stop()

    timer2 := time.NewTicker(2 * time.Second)
    defer timer2.Stop()

    for {
        select {
        case <-done2:
            // The other goroutine returned.
            fmt.Println("done func 1")
            return
        case <-timer1.C:
            fmt.Println("timer1 func 1")
        case <-timer2.C:
            fmt.Println("timer2 func 1")
            return
        }

    }
}()

go func() {
    defer close(done2)
    for {
        select {
        case <-done1:
            // The other goroutine returned.
            fmt.Println("done func 2")
            return
        default:
            time.Sleep(3 * time.Second)
            fmt.Println("sleep done from func 2")
            return
        }

    }
}()

fmt.Println("waiting for goroutines to complete")

// Wait for both goroutines to return. The order that
// we wait here does not matter. 
<-done1
<-done2

fmt.Println("all done")

Запустить его на детской площадке .

0 голосов
/ 03 апреля 2020

Ваша проблема в том, что вы хотите, чтобы одиночный сигнал отправлялся по каналу DONE для приема несколькими слушателями. Вам также необходимо подумать о том, получена ли отправка по каналу done вашими подпрограммами или вашими main забавными c.

Я предлагаю вам скорее отделить ожидания от подпрограмм go и done канал.

import `sync`

// This code will wait for the two functions to complete before ending
func main {
   var wait sync.WaitGroup
   wait.Add(2)
   go func() {
     defer wait.Done()
   }()
   go g() {
     defer wait.Done()
   }()
   wait.Wait()
}

Теперь, как управлять Готово. Что ж, решение состоит в том, чтобы использовать sync.Cond, и каждая процедура запускает свою собственную процедуру ожидания на Cond. Вот пример:

package main

import (
    `fmt`
    `sync`
    `time`
)

// WaitForIt wraps a Cond and a Mutex for a simpler API:
// .WAIT() chan struct{} will return a channel that will be
//   signalled when the WaitForIt is done.
// .Done() will indicate that the WaitForIt is done.
type WaitForIt struct {
    L *sync.Mutex
    Cond *sync.Cond
}

func NewWaitForIt() *WaitForIt {
    l := &sync.Mutex{}
    c := sync.NewCond(l)
    return &WaitForIt{ l, c }
}

// WAIT returns a chan that will be signalled when
// the Cond is triggered.
func (w *WaitForIt) WAIT() chan struct{} {
    D := make(chan struct{})
    go func() {
        w.L.Lock()
        defer w.L.Unlock()
        w.Cond.Wait()
        D <- struct{}{}
        close(D)
    }()
    return D
}

// Done indicates that the Cond should be triggered.
func (w *WaitForIt) Done() {
    w.Cond.Broadcast()
}

// doneFunc launches the func f with a chan that will be signalled when the
// func should stop. It also handles WaitGroup synchronization
func doneFunc(wait *sync.WaitGroup, waitForIt *WaitForIt, f func(DONE chan struct{})) {
    defer wait.Done()
    f(waitForIt.WAIT())
}

func main() {
    // wait will coordinate all the goroutines at the level of main()
    // between themselves the waitForIt will do the coordination
    wait := &sync.WaitGroup{}
    // waitForIt indicates to the goroutines when they should shut
    waitForIt := NewWaitForIt()

    // goFunc generates each goroutine. Only the 3-second goroutine will 
    // shutdown all goroutines
    goFunc := func(seconds int) func(chan struct{}) {
        return func(DONE chan struct{}) {
            // this is the actual code of each goroutine
            // it makes a ticker for a number of seconds,
            // and prints the seconds after the ticker elapses,
            // or exits if DONE is triggered
            timer := time.NewTicker(time.Duration(seconds) * time.Second)
            defer timer.Stop()
            for {
                select {
                case <- DONE:
                    return
                case <- timer.C:
                    if (3==seconds) {
                        waitForIt.Done()
                        // Don't shutdown here - we'll shutdown
                        // when our DONE is signalled
                    }
                }
            }
        }
    }
    // launch 3 goroutines, each waiting on a shutdown signal
    for i:=1; i<=3; i++ {
        wait.Add(1)
        go doneFunc(wait, waitForIt, goFunc(i))
    }
    // wait for all the goroutines to complete, and we're done
    wait.Wait()
}

Вот ваш пример, реализованный с использованием WaitForIt: https://play.golang.org/p/llphW73G1xE Обратите внимание, что мне пришлось удалить вызов Lock() в WaitForIt.Done. Хотя в документации говорится, что вам разрешено удерживать блокировку, она блокирует выполнение 2-й процедуры.

0 голосов
/ 03 апреля 2020
package main

import (
    "fmt"
    "sync"
    "time"
)

func func1(done chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    timer1 := time.NewTicker(1 * time.Second)
    timer2 := time.NewTicker(2 * time.Second)
    for {
        select {
        case <-timer1.C:
            fmt.Println("timer1 func 1")
        case <-timer2.C:
            // Ask GC to sweep the tickers timer1, timer2
            // as goroutine should return
            timer1.Stop()
            timer2.Stop()

            fmt.Println("timer2 func 1")

            done <- struct{}{} // Signal the other goroutine to terminate

            fmt.Println("sent done from func 1")
            return
        case <-done:
            // Ask GC to sweep the tickers timer1, timer2
            // as goroutine should return
            timer1.Stop()
            timer2.Stop()

            fmt.Println("done func 1")
            return

        }

    }
}

func func2(done chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    timer3 := time.NewTicker(3 * time.Second)
    for {
        select {
        case <-timer3.C:
            // Ask GC to sweep the tickers timer3
            // as goroutine should return
            timer3.Stop()

            fmt.Println("timer3 func 2")

            done <- struct{}{} // Signal the other goroutine to terminate

            fmt.Println("sent done from func 2")
            return
        case <-done:
            // Ask GC to sweep the tickers timer3
            // as goroutine should return
            timer3.Stop()
            fmt.Println("done func 2")
            return
        }

    }
}

func main() {
    // Chan used for signalling between goroutines
    done := make(chan struct{})

    // WaitGroup
    wg := sync.WaitGroup{}

    wg.Add(2)

    // Spawn the goroutine for func1
    go func1(done, &wg)
    // Spawn the goroutine for func2
    go func2(done, &wg)

    fmt.Println("starting sleep")

    // Wait for the goroutines
    wg.Wait()

    // Wait for 15 seconds
    // If not required, please remove
    // the lines below
    time.Sleep(15 * time.Second)
    fmt.Println("waited 15 seconds")

}

...