Реализация конвейера рабочих функций - PullRequest
0 голосов
/ 14 октября 2018

Я реализую конвейер из нескольких рабочих функций, связанных каналами.Все они получают (in, out chan interface{}) в качестве входа (каждая функция получает out от предыдущей как in)

У меня нет никаких гарантий, что out будет закрыта в конце каждогофункция, поэтому мне интересно, как я должен проверить, выполнила ли предыдущая функция свою работу.Я начал с чего-то вроде этого:

func ExecutePipeline(jobs ...job) {
    out := make(chan interface{}, 10)
    for _, val := range jobs {
        in := out
        out := make(chan interface{})
        go val(in, out)
    }
}

Я думаю о том, чтобы каким-то образом использовать WaitGroup, чтобы использовать конец функции в качестве индикатора того, что она выполнила свою работу и закрыла выходной канал.
Как я могу это сделать?

1 Ответ

0 голосов
/ 14 октября 2018

Если вы намереваетесь распространить сигнал по конвейеру для связи, когда предыдущие этапы конвейера завершены, и не будут давать никаких дополнительных значений, вы можете сделать это синхронно, закрыв канал после каждого этапа конвейера.Следующий код делает это путем переноса вызова каждого работника конвейера:

func startWork(val job, in, out chan interface{}) {
    val(in, out)
    // out will be closed after val returns
    close(out)
}

// Later, in ExecutePipeline, start your worker by calling startWork
func ExecutePipeline(jobs ...job) {
    // ...
    for _, val := range jobs {
        // ...
        go startWork(val, in, out)
    }
}

Предотвращение закрытия нескольких каналов

У меня нет никаких гарантий, что outбудет закрыт в конце каждой функции

И наоборот, если какой-либо рабочий может закрыть канал, это проблематично;последующий вызов в startWork для закрытия канала вызовет панику, если вы попытаетесь закрыть уже закрытый канал.

В этой простой реализации работники должны делегировать закрытие канала коду, который контролирует конвейер, чтобы избежатьваша программа для паники.


Обработка внутриполосной сигнализации

Поскольку сигнализация передается внутриполосной (в том же канале, что и данные), может потребоваться осторожность при реализацииваших сотрудников конвейера, чтобы убедиться, что они различают

  • чтения значения из открытого канала и
  • чтения нулевого значения из закрытого канала

range по каналу в цикле for автоматически прервет цикл при закрытии канала.Если вы реализуете свою собственную логику для чтения из канала, вам нужно будет определить, когда тривиальное успешное чтение выполняется с нулевым значением, потому что канал закрыт.Это может быть достигнуто с помощью многозначной формы назначения оператора приема , которая будет возвращать логическое значение, когда чтение из канала имело нулевое значение, поскольку канал был закрыт и пуст.

func someWorker(in, out chan interface{}) {
    for {
        val, open := <-in
        if !open {
            // Read of val was the zero value of "in", indicating the channel
            // is closed.
            break // or, if appropriate, return
        }
    }
}
...