Канал на основе канала застревает - PullRequest
0 голосов
/ 13 января 2020

Я пытаюсь построить конвейер для приема данных с Go. 3 этапа: «Загрузка пакетов», «Преобразование каждого сообщения» и «Добавление сообщений в очередь».

Логика c, которая мне показалась естественной, состоит в том, чтобы создать 3 функции для каждого этапа и t ie эти функции с небуферизованными каналами.

Где-то в моем коде я неправильно реализую каналы или не использую группы ожидания? Поскольку только 1 сообщение попадает в финальную стадию, и программа, кажется, останавливается / блокируется.

    func (c *worker) startWork() {
        // channel for messages to be sent to the queue
        chMessagesToEnqueue := make(chan types.Foo)

        // channel for messages to be transformed
        chMessagesToTransform := make(chan []types.UpstreamFooType)

        // start the goroutines with the channels
        go c.startTransformer(chMessagesToTransform, chMessagesToEnqueue)
        go c.startEnqueuer(chMessagesToEnqueue)
        go c.startDownloader(chMessagesToTransform)
    }

    func (c *worker) startDownloader(out chan []types.UpstreamFooType) {
        // https://github.com/SebastiaanKlippert/go-soda
        // uses a library here to fetch data from upstream APIs, but the gist is:
        var wg sync.WaitGroup
        for i := 0; i < c.workerCount; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for {
                    // i've cut out some meat out to make more concise
                    var results []types.UpstreamFooType
                    err = json.NewDecoder(resp.Body).Decode(&results)
                    out <- results
                }
            }()
        }
        wg.Wait()        
    }

    func (c *worker) startTransformer(in <-chan []types.UpstreamFooType, out chan types.Foo) {
        data := <-in
        for _, record := range data {
            msg := types.Foo{
                name: record.fifa,
            }
            out <- msg
        }
    }

    func (c *worker) startEnqueuer(in <-chan []types.Foo) {
        data := <-in
        c.logger.Infow("startEnqueuer", "data", data)
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...