Где мне закрыть канал в этом конкретном примере c? - PullRequest
0 голосов
/ 25 мая 2020

Я пишу просто простой Go конвейер, цель - получить URL-адреса и статус печати. ​​

При fetchUrl мне нужно закрыть канал, чтобы уведомить основной, не будет данных так что отпустите основную процедуру go. Однако я не могу закрыть канал функции fetchurl после l oop, потому что это будет слишком рано. Я не хочу добавлять группы ожидания в приложение, потому что вся моя цель на данный момент - понять каналы.

В функции fetchurl канал, называемый two, предназначен для того, чтобы просто убедиться, что одновременно будет только 2 задания.

package main

import (
    "fmt"
    "net/http"
    "os"
)

func gen(val []string) <-chan string {
    out := make(chan string, len(val))
    for _, val := range val {
        out <- val
    }
    close(out)
    return out
}

func fetchUrl(in <-chan string) <-chan string {
    out := make(chan string)
    two := make(chan struct{}, 2)
    fmt.Println("blocked")
    for url := range in {
        two <- struct{}{}
        go fetchWorker(url, two, out)
    }

    return out
}

func fetchWorker(url string, two chan struct{}, out chan string) {
    res, err := http.Get("https://" + url)
    if err != nil {
        panic(err)
    }
    <-two
    out <- fmt.Sprintf("[%d] %s\n", res.StatusCode, url)
}

func main() {
    for val := range fetchUrl(gen(os.Args[1:])) {
        fmt.Println(val)
    }
}

1 Ответ

3 голосов
/ 25 мая 2020

Вам необходимо закрыть канал out после того, как в него были записаны все результаты. Самый простой способ узнать об этом - когда все рабочие горутины завершились, а самый простой способ определить это, в свою очередь, - использовать sync.WaitGroup. (В Go каналы и горутины являются очень тесно связанными понятиями, поэтому управление горутинами является частью работы с каналами.)

В существующем коде вы можете ie это в свою fetchUrl функцию :

var wg sync.WaitGroup
for url := range in {
    two <- struct{}{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        fetchWorker(url, two, out)
    }()
}
wg.Wait()
close(out)

Другая структурная проблема, с которой вы столкнетесь с написанным кодом, заключается в том, что оба gen и fetchUrl создают каналы, запускают весь код, который должен записывать в каналы, и верните канал только после того, как эти авторы закончат; поскольку до возврата из функции ничего нельзя прочитать из канала, это приведет к тупикам. Вы можете обойти это, создав все каналы на верхнем уровне и передав их в функции генератора.

Если вы хотите, чтобы ровно два воркера читали из одной и той же очереди URL-адресов, стандартный шаблон - просто запустить две горутины для чтения и записи из одних и тех же каналов. Например, вы могли бы переписать fetchWorker как

func fetchWorker(urls <-chan string, out chan<- string) {
    for url := range urls {
        res, err := http.Get("https://" + url)
        if err != nil {
            panic(err)
        }
        out <- fmt.Sprintf("[%d] %s\n", res.StatusCode, url)
    }
}

На верхнем уровне создать каналы, создать рабочих, подать вход и потреблять выход.

func main() {
    urls := make(chan string)
    out := make(chan string)

    // Launch a goroutine to feed data into urls, then
    // close(urls), then stop
    go gen(os.Args[1:], urls)

    // Launch worker goroutines
    workerCount := 2
    var wg sync.WaitGroup
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fetchWorker(urls, out)
        }()
    }

    // Launch a dedicated goroutine to close the channel
    go func() {
        wg.Wait()
        close(out)
    }()

    // Read the results
    for result := range(out) {
        fmt.Println(result)
    }
}
...