группа ожидания на подмножестве подпрограмм go - PullRequest
0 голосов
/ 07 сентября 2018

У меня есть ситуация, когда в основных подпрограммах го создаются подпрограммы «х» го. но он заинтересован только в "y" (y

Я надеялся использовать Waitgroup. Но Waitgroup только позволяет мне ждать на всех ходу рутины. Я не могу, например, сделать это,

1. wg.Add (y)
2 create "x" go routines. These routines will call wg.Done() when finished. 
3. wg. Wait()

Это паника, когда процедура y + 1 go вызывает wg.Done (), потому что счетчик wg становится отрицательным.

Я уверен, что могу использовать каналы, чтобы решить эту проблему, но мне интересно, решит ли это Waitgroup.

Ответы [ 3 ]

0 голосов
/ 07 сентября 2018

Это y конкретные подпрограммы, которые вы пытаетесь отследить, или какие-либо y вне x? Каковы критерии?

Обновление:

1. Если у вас есть контроль над какими-либо критериями, выберите matching y go-рутины:

Вы можете сделать wp.wg.Add(1) и wp.wg.Done() изнутри группы, основываясь на вашем условии, передав его в качестве аргумента-указателя в программу, если ваше состояние не может быть проверено за пределами программы.

Что-то вроде приведенного ниже примера кода. Будет более точным, если вы предоставите более подробную информацию о том, что вы пытаетесь сделать.

func sampleGoroutine(z int, b string, wg *sync.WaitGroup){

    defer func(){
        if contition1{
            wg.Done()
        }
    }

    if contition1 {
        wg.Add(1)
        //do stuff
    }
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < x; i++ {
        go sampleGoroutine(1, "one", &wg)
    }
    wg.Wait()
}

2. Если вы не можете контролировать, какие из них, и просто хотите first y:

Исходя из вашего комментария, у вас нет контроля / желания выбирать какие-либо конкретные программы, кроме тех, которые заканчиваются первыми. Если вы хотите сделать это универсальным способом, вы можете использовать приведенную ниже пользовательскую реализацию waitGroup, которая соответствует вашему варианту использования. (Хотя это не защищено от копирования. Также нет / нужен метод wg.Add (int))

type CountedWait struct {
    wait  chan struct{}
    limit int
}

func NewCountedWait(limit int) *CountedWait {
    return &CountedWait{
        wait:  make(chan struct{}, limit),
        limit: limit,
    }
}

func (cwg *CountedWait) Done() {
    cwg.wait <- struct{}{}
}

func (cwg *CountedWait) Wait() {
    count := 0
    for count < cwg.limit {
        <-cwg.wait
        count += 1
    }
}

Что можно использовать следующим образом:

func sampleGoroutine(z int, b string, wg *CountedWait) {

    success := false

    defer func() {
        if success == true {
            fmt.Printf("goroutine %d finished successfully\n", z)
            wg.Done()
        }
    }()

    fmt.Printf("goroutine %d started\n", z)
    time.Sleep(time.Second)

    if rand.Intn(10)%2 == 0 {
        success = true
    }
}

func main() {
    x := 10
    y := 3
    wg := NewCountedWait(y)

    for i := 0; i < x; i += 1 {
        // Wrap our work function with the local signalling logic
        go sampleGoroutine(i, "something", wg)
    }

    wg.Wait()

    fmt.Printf("%d out of %d goroutines finished successfully.\n", y, x)
}

3. Вы также можете забить в context с 2, чтобы гарантировать, что оставшиеся goroutines не просачиваются Возможно, вы не сможете запустить это на play.golang, так как он долго спит.

Ниже приведен пример вывода: (обратите внимание, что может быть больше y = 3 goroutines, отмечающих Done, но вы ждете только до 3 финиша)

goroutine 9 started goroutine 0 started goroutine 1 started goroutine 2 started goroutine 3 started goroutine 4 started goroutine 5 started goroutine 5 marking done goroutine 6 started goroutine 7 started goroutine 7 marking done goroutine 8 started goroutine 3 marking done continuing after 3 out of 10 goroutines finished successfully. goroutine 9 will be killed, bcz cancel goroutine 8 will be killed, bcz cancel goroutine 6 will be killed, bcz cancel goroutine 1 will be killed, bcz cancel goroutine 0 will be killed, bcz cancel goroutine 4 will be killed, bcz cancel goroutine 2 will be killed, bcz cancel

Воспроизведение ссылок

  1. https://play.golang.org/p/l5i6X3GClBq
  2. https://play.golang.org/p/Bcns0l9OdFg
  3. https://play.golang.org/p/rkGSLyclgje
0 голосов
/ 07 сентября 2018

Как отмечено в ответ Адриана , sync.WaitGroup - это простой счетчик, метод которого Wait будет блокироваться, пока значение счетчика не достигнет нуля. Он предназначен для того, чтобы позволить вам блокировать (или объединять) ряд процедур, прежде чем продолжить основной поток выполнения.

Интерфейс WaitGroup недостаточно выразителен для вашего случая использования и не предназначен для этого. В частности, вы не можете использовать его наивно, просто вызывая wg.Add(y) (где y wg.Done с помощью (y + 1) th goroutine вызовет панику , так как группа ожидания имеет отрицательное внутреннее значение. Кроме того, мы не можем быть «умными», наблюдая значение внутреннего счетчика WaitGroup; это нарушит абстракцию, и в любом случае ее внутреннее состояние не будет экспортировано.


Реализуй своё!

Вы можете реализовать соответствующую логику самостоятельно, используя несколько каналов согласно приведенному ниже коду ( ссылка на игровую площадку ). Заметьте из консоли, что запущено 10 goroutines, но после того, как два из них завершены, мы выпали, чтобы продолжить выполнение в методе main.

package main

import (
    "fmt"
    "time"
)

// Set goroutine counts here
const (
    // The number of goroutines to spawn
    x = 10
    // The number of goroutines to wait for completion
    // (y <= x) must hold.
    y = 2
)

func doSomeWork() {
    // do something meaningful
    time.Sleep(time.Second)
}

func main() {
    // Accumulator channel, used by each goroutine to signal completion.
    // It is buffered to ensure the [y+1, ..., x) goroutines do not block
    // when sending to the channel, which would cause a leak. It will be
    // garbage collected when all goroutines end and the channel falls
    // out of scope. We receive y values, so only need capacity to receive
    // (x-y) remaining values.
    accChan := make(chan struct{}, x-y)

    // Spawn "x" goroutines
    for i := 0; i < x; i += 1 {
        // Wrap our work function with the local signalling logic
        go func(id int, doneChan chan<- struct{}) {
            fmt.Printf("starting goroutine #%d\n", id)
            doSomeWork()
            fmt.Printf("goroutine #%d completed\n", id)

            // Communicate completion of goroutine
            doneChan <- struct{}{}
        }(i, accChan)
    }

    for doneCount := 0; doneCount < y; doneCount += 1 {
        <-accChan
    }

    // Continue working
    fmt.Println("Carrying on without waiting for more goroutines")
}

Избегайте утечки ресурсов

Так как это не ожидает завершения выполнения процедур [y + 1, ..., x), вы должны уделить особое внимание функции doSomeWork, чтобы устранить или минимизировать риск того, что работа может блокироваться на неопределенный срок, что также приведет к утечке. Удалите, где это возможно, возможность неограниченной блокировки ввода-вывода (включая операции с каналами) или попадания в бесконечные циклы.

Вы можете использовать context, чтобы сигнализировать дополнительным процедурам, когда их результаты больше не требуются, чтобы они прекратили выполнение.

0 голосов
/ 07 сентября 2018

WaitGroup на самом деле не ожидает выполнения процедур, а ждет, пока его внутренний счетчик не достигнет нуля. Если вы наберете только Add() количество нужных вам подпрограмм и наберете Done() только в тех подпрограммах, которые вас интересуют, то Wait() будет блокировать только до тех пор, пока не завершится выполнение тех подпрограмм, которые вам нужны. Вы полностью контролируете логику и поток, нет никаких ограничений на то, что WaitGroup «позволяет».

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...