Можно ли добавлять элементы в канал Go во время обработки? - PullRequest
0 голосов
/ 17 апреля 2020

Я пытаюсь найти способ рекурсивного прохождения задачи в Go с использованием goroutines. Цель программы - поместить входной элемент в канал и добавить к выходному каналу вход -1, пока не будет достигнут 0. Количество рабочих обработки должно быть адаптируемым. Процесс, которому я следую, выглядит следующим образом:

Создайте входной выходной канал. Добавьте начальный номер к входному каналу. Инициализируйте работников для запуска рабочей функции. L oop и распечатайте выходы в выходном канале.

func main() {
    inputChannel := make(chan int, 1)
    outputChannel := make(chan int)
    inputChannel <- 100
    numWorkers := 4
    for i := 0; i < numWorkers; i++ {
        go worker(inputChannel, outputChannel)
    }
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
}

Далее, в функции заказа мы пропускаем oop элементы во входном канале, каждый раз проверяя, есть ли еще элементы для приема. Если есть еще элементы для приема, мы печатаем элемент ввода, вычитаем 1 из элемента и отправляем во входной канал для другого работника, чтобы выбрать, если элемент больше 0. Если во входном канале ничего не осталось, тогда мы return.

func worker(input chan int, output chan<- int) {
    defer close(input)
    defer close(output)
    for {
        element, more := <-input
        if more {
            fmt.Println("Input: ", element)
            element--
            if element != 0 {
                input <- element
            }
        } else {
            fmt.Println("All Jobs Processed")
            return
        }
    }
}

Вывод, который я вижу:

Input:  100
Input:  99
Input:  98
Input:  97
Input:  96
Input:  95
Input:  94
Input:  93
Input:  92
Input:  91
Input:  90
Input:  89
Input:  88
Input:  87
Input:  86
Input:  85
Input:  84
Input:  83
Input:  82
Input:  81
Input:  80
Input:  79
Input:  78
Input:  77
Input:  76
Input:  75
Input:  74
Input:  73
Input:  72
Input:  71
Input:  70
Input:  69
Input:  68
Input:  67
Input:  66
Input:  65
Input:  64
Input:  63
Input:  62
Input:  61
Input:  60
Input:  59
Input:  58
Input:  57
Input:  56
Input:  55
Input:  54
Input:  53
Input:  52
Input:  51
Input:  50
Input:  49
Input:  48
Input:  47
Input:  46
Input:  45
Input:  44
Input:  43
Input:  42
Input:  41
Input:  40
Input:  39
Input:  38
Input:  37
Input:  36
Input:  35
Input:  34
Input:  33
Input:  32
Input:  31
Input:  30
Input:  29
Input:  28
Input:  27
Input:  26
Input:  25
Input:  24
Input:  23
Input:  22
Input:  21
Input:  20
Input:  19
Input:  18
Input:  17
Input:  16
Input:  15
Input:  14
Input:  13
Input:  12
Input:  11
Input:  10
Input:  9
Input:  8
Input:  7
Input:  6
Input:  5
Input:  4
Input:  3
Input:  2
Input:  1
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:31 +0x179

goroutine 6 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4

goroutine 7 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4

goroutine 8 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4

goroutine 9 [chan receive]:
main.worker(0xc00004e070, 0xc00005c060)
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:9 +0xac
created by main.main
        /Users/ianmitchell/go/src/github.com/iancmitchell/channel-recursion/main.go:29 +0xc4
exit status 2 

Я пробовал это множеством способов, полагаясь на каналы вроде этого и используя группы ожидания, но я не могу показаться заставить процесс работать через все элементы и выдавать результаты.

1 Ответ

1 голос
/ 17 апреля 2020

Хорошо, вот мы go. Во-первых, обратите внимание, что в вашем коде есть некоторые проблемы. Затем исправьте их.

  • Как сказал Адриан , прочитайте канал, который уже закрыт или не имеет элемента. В твоем рабочем веселье c ты этим занимаешься. Это происходит, когда вы читаете элементы из входного канала после закрытия входного канала другим работником.

    func worker(input chan int, output chan<- int) {
        defer close(input)
        ...
        for {
            element, more := <-input
            ...
        }
    }
    

    Итак, почему вы не закрываете входной канал после того, как все рабочие завершают sh?

  • После решения проблемы с вашим входным каналом возникнет еще один, когда вы пытаетесь читать с выходного канала. Более того, вы ничего не отправляете по выходному каналу. Если вам не нужен этот канал, то почему вы используете этот. А также этот выходной канал не буферизован (канал размера 0 и send-receive должны быть одновременно, иначе возникнет ситуация взаимоблокировки). Смотрите, буферизованные и небуферизованные от здесь и здесь . Может быть, есть более полезные документы в Интернете. Спасибо моему другу Nightfury1204 за первую ссылку о буферизированном и небуферизованном канале из его этого поста .

    outputChannel := make(chan int) // unbuffered, no size is defined
    ...
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
    

    Итак, если вы хотите что-то отправить на выходной канал, тогда logi c ваш собственный. Например, вы можете отправить что-то после завершения обработки входного канала в рабочих. В этом случае объявите ваш выходной канал как буферизованный, имеющий длину 4 (с 4 рабочими у вас работает). После завершения всех ваших работников закройте ваш выходной канал и затем прочитайте.

    outputChannel := make(chan int, 4) // buffered
    ...
    // after finishing all your workers
    close(outputChannel)
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
    

В качестве примечания используйте sync.WaitGroup из пакета "sync" дождаться набора горутин до фини sh.

См. пример ниже: https://play.golang.org/p/WAqwyR0ggNN

import "fmt"
import "sync"

func main() {
    inputChannel := make(chan int, 1)
    outputChannel := make(chan int, 4)

    var wg sync.WaitGroup
    wg.Add(4)

    inputChannel <- 100
    numWorkers := 4
    for i := 0; i < numWorkers; i++ {
        go func() {
            defer wg.Done()
            for {
                select {
                case element := <-inputChannel:
                    fmt.Println("Input: ", element)
                    element--
                    if element != 0 {
                        inputChannel <- element
                    }
                default:
                    outputChannel<-0
                    fmt.Println("All Jobs Processed", len(outputChannel))
                    return
                }
            }
        }()
    }
    wg.Wait()
    close(inputChannel)
    close(outputChannel)
    for elem := range outputChannel {
        fmt.Println("Output: ", elem)
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...