Как согласовать отключение со многими горутинками - PullRequest
0 голосов
/ 06 февраля 2019

Скажем, у меня есть функция

type Foo struct {}

func (a *Foo) Bar() {
    // some expensive work - does some calls to redis
}

, которая выполняется в рамках программы в какой-то момент в моем приложении.Многие из них могут выполняться в любой момент.До завершения работы приложения я хотел бы убедиться, что все остальные программы завершили свою работу.

Могу ли я сделать что-то вроде этого:

type Foo struct {
    wg sync.WaitGroup
}

func (a *Foo) Close() {
    a.wg.Wait()
}

func (a *Foo) Bar() {
    a.wg.Add(1)
    defer a.wg.Done()

    // some expensive work - does some calls to redis
}

Предполагая, что Bar выполняется в программе имногие из них могут быть запущены в определенное время, и этот Bar не следует вызывать после вызова Close и вызова Close по сигнтерме или sigint.

Имеет ли это смысл?

Обычно ябудет выглядеть функция бара следующим образом:

func (a *Foo) Bar() {
    a.wg.Add(1)

    go func() {
        defer a.wg.Done()
        // some expensive work - does some calls to redis
    }()
}

Ответы [ 4 ]

0 голосов
/ 08 февраля 2019

Шаблон, который я использую много: https://play.golang.org/p/ibMz36TS62z

package main

import (
    "fmt"
    "sync"
    "time"
)

type response struct {
    message string
}

func task(i int, done chan response) {
    time.Sleep(1 * time.Second)
    done <- response{fmt.Sprintf("%d done", i)}
}

func main() {

    responses := GetResponses(10)

    fmt.Println("all done", len(responses))
}

func GetResponses(n int) []response {
    donequeue := make(chan response)
    wg := sync.WaitGroup{}
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(value int) {
            defer wg.Done()
            task(value, donequeue)
        }(i)
    }
    go func() {
        wg.Wait()
        close(donequeue)
    }()
    responses := []response{}
    for result := range donequeue {
        responses = append(responses, result)
    }

    return responses
}

, это также облегчает регулирование: https://play.golang.org/p/a4MKwJKj634

package main

import (
    "fmt"
    "sync"
    "time"
)

type response struct {
    message string
}

func task(i int, done chan response) {
    time.Sleep(1 * time.Second)
    done <- response{fmt.Sprintf("%d done", i)}
}

func main() {

    responses := GetResponses(10, 2)

    fmt.Println("all done", len(responses))
}

func GetResponses(n, concurrent int) []response {

    throttle := make(chan int, concurrent)
    for i := 0; i < concurrent; i++ {
        throttle <- i
    }
    donequeue := make(chan response)
    wg := sync.WaitGroup{}
    for i := 0; i < n; i++ {
        wg.Add(1)
        <-throttle
        go func(value int) {
            defer wg.Done()
            throttle <- 1
            task(value, donequeue)
        }(i)
    }
    go func() {
        wg.Wait()
        close(donequeue)
    }()
    responses := []response{}
    for result := range donequeue {
        responses = append(responses, result)
    }

    return responses
}
0 голосов
/ 06 февраля 2019

WaitGroup - это один из способов, однако команда Go представила errgroup для вашего случая использования.Самая неудобная часть ответа листового бибопа - игнорирование обработки ошибок.Обработка ошибок - причина существования errgroup.Идиоматический код go не должен никогда глотать ошибки.

Однако, сохраняя подписи вашей Foo структуры (кроме косметической workerNumber) - и не обрабатывая ошибок - мое предложение выглядит так:

package main

import (
    "fmt"
    "math/rand"
    "time"

    "golang.org/x/sync/errgroup"
)

type Foo struct {
    errg errgroup.Group
}

func NewFoo() *Foo {
    foo := &Foo{
        errg: errgroup.Group{},
    }
    return foo
}

func (a *Foo) Bar(workerNumber int) {
    a.errg.Go(func() error {
        select {
        // simulates the long running clals
        case <-time.After(time.Second * time.Duration(rand.Intn(10))):
            fmt.Println(fmt.Sprintf("worker %d completed its work", workerNumber))
            return nil
        }
    })
}

func (a *Foo) Close() {
    a.errg.Wait()
}

func main() {
    foo := NewFoo()

    for i := 0; i < 10; i++ {
        foo.Bar(i)
    }

    <-time.After(time.Second * 5)
    fmt.Println("Waiting for workers to complete...")
    foo.Close()
    fmt.Println("Done.")
}

Преимущество здесь, если вы вводите обработку ошибок в свой код (вам следует), вам нужно лишь слегка изменить этот код: вкратце, errg.Wait() вернет первую ошибку redis, а Close() может распространить это вверх по стеку (на главную, в данном случае).

Используя пакет context.Context, вы также сможете немедленно отменить любой запущенный вызов redis, если произойдет сбой.Примеры этого есть в документации errgroup.

0 голосов
/ 06 февраля 2019

Я думаю, что бесконечное ожидание завершения всех процедур рутины - неправильный путь.Если одна из подпрограмм go блокируется или говорит, что она по какой-то причине зависает и не завершается успешно, что должно произойти, чтобы завершить процесс или дождаться завершения процедур go?

Вместо этого следует подождать с некоторым таймаутом и завершитьприложение независимо от того, все ли подпрограммы завершены или нет.

Редактировать: Оригинал и Спасибо @leaf bebop за указание на это.Я неправильно понял вопрос.

Пакет контекста может использоваться для сигнализации всех процедур go для обработки сигнала уничтожения.

appCtx, cancel := context.WithCancel(context.Background())

Здесь appCtx должен быть переданвсе подпрограммы go.

При входном сигнале вызова cancel().

функции, выполняемые как подпрограммы go, могут обрабатывать обработку контекста отмены.

Использование отмены контекстана ходу

0 голосов
/ 06 февраля 2019

Да, WaitGroup - правильный ответ.Вы можете использовать WaitGroup.Add в любое время, когда счетчик больше нуля, согласно doc .

Обратите внимание, что вызовы с положительной дельтой, которые происходят, когда счетчик равен нулю, должныслучиться до ожидания.Вызовы с отрицательной дельтой или звонки с положительной дельтой, которые начинаются, когда счетчик больше нуля, могут происходить в любое время.Как правило, это означает, что вызовы Add должны выполняться перед оператором, создающим программу или другое событие, которое следует ожидать.Если WaitGroup повторно используется для ожидания нескольких независимых наборов событий, новые вызовы Add должны произойти после того, как все предыдущие вызовы Wait будут возвращены.Смотрите пример WaitGroup.

Но есть одна хитрость в том, что вы должны всегда держать счетчик больше нуля, прежде чем вызывать Close.Обычно это означает, что вы должны позвонить wg.Add в NewFoo (или что-то в этом роде) и wg.Done в Close.И для предотвращения множественных вызовов на Done разрушение группы ожидания, вы должны заключить Close в sync.Once.Вы также можете запретить вызов новых Bar().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...