Попытка получить конвейер обработки FIFO с использованием канала - PullRequest
0 голосов
/ 21 января 2019

Я пытаюсь написать последовательный конвейер обработки в Go, используя простой поддельный пример. Он пересекает фальшивый каталог и запускает некоторые преобразования. Таким образом, между ними есть строковый канал. После того, как функция записывает данные, вторая функция читает их.

Мне кажется, что это работает и работает только последовательно, когда я помещаю ключевое слово go перед функцией WalkFakeDirectory, как показано в примере кода ниже ( детская площадка ).

Был бы очень признателен, если кто-нибудь может объяснить, как это работает?

package main 
import (
"fmt"
"strings"
"sync"
"time"
)

func main() {

done := make(chan int)
path := make(chan string)

defer close(done)

//var wg sync.WaitGroup - Not working too
//wg.Add(1)

fmt.Println("walking file path")

go WalkFakeDirectoy(done, path)

//wg.Add(1)

ConvertToUpperCase(path, done)

//wg.Wait()

fmt.Println("done!")
//time.Sleep(2000) // Not working 
}

func ConvertToUpperCase(files chan string, done chan int) {

   for file := range files {
      fmt.Println("converting data data", strings.ToUpper(file))
   }
}

func WalkFakeDirectoy(done chan int, path chan<- string) {

   func() {
    list := []string{"abc", "def", "fgh", "ijk", "mnn"}

    for _, file := range list {
        fmt.Println("getting data", file)
        path <- file
        time.Sleep(3000)
      }
   }()
}

1 Ответ

0 голосов
/ 21 января 2019

В этом блоге Go о конвейерах должно быть достаточно информации, чтобы создать свой собственный. Суть его в следующем примере кода:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}
func sample_pipeline() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)
    out2 := sq(out)

    // Consume the output.
    for o := range out2 {
        fmt.Println(o)
    }
}

func main() {
    sample_pipeline()
}

sq - этап конвейера - он принимает канал со входами и возвращает канал с выходами (квадраты значений входов). sample_pipeline устанавливает двухступенчатый конвейер и подключает к нему генератор с двумя значениями.

Обратите внимание на то, как завершается обработка - каждый этап конвейера представляет собой процедуру, которая выполняет этап конвейера (ожидание новых данных из входного канала, их обработка, отправка вывода). Когда входной канал каждой ступени завершен (цикл диапазона на канале останавливается), он закрывает свой собственный канал. Закрытие канала - это канонический способ сигнализации "этот канал завершен" в Go.

...