Как реализовать конвейер до рутин? - PullRequest
0 голосов
/ 09 февраля 2019

Мне нужна помощь в понимании того, как использовать конвейер для передачи данных из одной программы в другую.

Я прочитал пост блога golang на конвейере , я понял, но не смогя не смог полностью реализовать его и, таким образом, подумал, что ищет помощи у сообщества.

Теперь я придумал этот уродливый код ( Playground ):

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    wg := sync.WaitGroup{}
    ch := make(chan int)
    for a := 0; a < 3; a++ {
        wg.Add(1)
        go func1(int(3-a), ch, &wg)
    }
    go func() {
        wg.Wait()
        close(ch)
    }()
    wg2 := sync.WaitGroup{}
    ch2 := make(chan string)
    for val := range ch {
        fmt.Println(val)
        wg2.Add(1)
        go func2(val, ch2, &wg2)
    }
    go func() {
        wg2.Wait()
        close(ch2)
    }()
    for val := range ch2 {
        fmt.Println(val)
    }
}

func func1(seconds int, ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    time.Sleep(time.Duration(seconds) * time.Second)
    ch <- seconds
}

func func2(seconds int, ch chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    ch <- "hello"
}

Проблема

Я хочу сделать это надлежащим образом, используя конвейеры или любой другой подходящий для этого способ.

Кроме того, конвейер, показанный в посте блога, недля goroutines и, следовательно, я не могу сделать это сам.

В реальной жизни эти func1 и func2 являются функциями, которые извлекают ресурсы из Интернета, и, следовательно, они запускаются в своей собственной программе..

Спасибо.
Temporarya
(Голографический нуби)

PS Примеры из реальной жизни и использование конвейера с использованием goroutines также очень помогли бы.

1 Ответ

0 голосов
/ 09 февраля 2019

Ключевой шаблон этого сообщения конвейера состоит в том, что вы можете просматривать содержимое канала в виде потока данных и писать набор взаимодействующих процедур, которые формируют граф потока обработки данных.Это может быть способом получить некоторый параллелизм в ориентированном на данные приложении.

С точки зрения дизайна, вам также может оказаться более полезным создавать блоки, которые не привязаны к структуре goroutine, и оборачиватьих в каналах.Это значительно упрощает тестирование кода нижнего уровня, и если вы передумаете запускать вещи в обычной программе или нет, проще добавить или удалить оболочку.

Так что в вашем примере я быначните с рефакторинга задач самого низкого уровня в их собственные (синхронные) функции:

func fetch(ms int) int {
    time.Sleep(time.Duration(ms) * time.Millisecond)
    return ms
}

func report(ms int) string {
    return fmt.Sprintf("Hello after %d ms", ms)
}

Поскольку вторая половина вашего примера довольно синхронна, ее легко адаптировать к шаблону конвейера.Мы пишем функцию, которая потребляет весь свой входной поток и производит полный выходной поток, закрывая его, когда это будет сделано.

func reportAll(mss <-chan int, out chan<- string) {
    for ms := range mss {
        out <- report(ms)
    }
    close(out)
}

Функция, которая вызывает асинхронный код, немного сложнее.В главном цикле функции каждый раз, когда вы читаете значение, вам нужно запустить программу для его обработки.Затем, после того, как вы прочитали все из входного канала, вам нужно дождаться завершения всех этих процедур, прежде чем закрывать выходной канал.Здесь вы можете использовать небольшую анонимную функцию, чтобы помочь.

func fetchAll(mss <-chan int, out chan<- int) {
    var wg sync.WaitGroup
    for ms := range mss {
        wg.Add(1)
        go func(ms int) {
            out <- fetch(ms)
            wg.Done()
        }(ms)
    }
    wg.Wait()
    close(out)
}

Здесь также полезно (поскольку записи каналов блокируются), чтобы написать другую функцию для заполнения входных значений.

func produceInputs(mss chan<- int) {
    for ms := 1000; ms > 0; ms -= 300 {
        mss <- ms
    }
    close(mss)
}

Теперь вашей основной функции необходимо создать каналы между ними и запустить конечного потребителя.

// main is the entry point to the program.
//
//                   mss        fetched       results
//     produceInputs --> fetchAll --> reportAll --> main
func main() {
    mss := make(chan int)
    fetched := make(chan int)
    results := make(chan string)

    go produceInputs(mss)
    go fetchAll(mss, fetched)
    go reportAll(fetched, results)

    for val := range results {
        fmt.Println(val)
    }
}

https://play.golang.org/p/V9Z7ECUVIJL - полный пример.

Я избегал ручного обходаsync.WaitGroup здесь (и, как правило, это делается в общем случае: у вас не будет WaitGroup, если вы явно не вызываете что-то как верхний уровень программы, поэтому продвижение управления WaitGroup до вызывающей стороны делает код более модульным; см. мою fetchAll функцию выше для примера).Как я узнаю, что все мои программы закончились?Мы можем проследить:

  • Если я достиг конца main, канал results закрыт.
  • Канал results является выходным каналомreportAll;если он закрылся, то эта функция достигла конца своего выполнения;и если это произошло, то канал fetched закрывается.
  • Канал fetched является выходным каналом fetchAll;...

Еще один способ взглянуть на это состоит в том, что как только источник конвейера (produceInputs) закрывает свой выходной канал и завершает работу, сигнал "Я готов" поступает по конвейеруи заставляет последующие шаги закрывать свои выходные каналы и заканчивать тоже.

В блоге упоминается отдельный явный канал закрытия.Я не вдавался в это здесь вообще.Однако, поскольку она была написана, стандартная библиотека получила пакет context , который теперь является стандартной идиомой для управления ими.Вы должны будете использовать оператор select в теле основного цикла, что делает обработку немного более сложной.Это может выглядеть так:

func reportAllCtx(ctx context.Context, mss <-chan int, out chan<- string) {
    for {
        select {
            case <-ctx.Done():
                break
            case ms, ok := <-mss:
                if ok {
                    out <- report(ms)
                } else {
                    break
                }
            }
        }
    }
    close(out)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...