Если вы намереваетесь распространить сигнал по конвейеру для связи, когда предыдущие этапы конвейера завершены, и не будут давать никаких дополнительных значений, вы можете сделать это синхронно, закрыв канал после каждого этапа конвейера.Следующий код делает это путем переноса вызова каждого работника конвейера:
func startWork(val job, in, out chan interface{}) {
val(in, out)
// out will be closed after val returns
close(out)
}
// Later, in ExecutePipeline, start your worker by calling startWork
func ExecutePipeline(jobs ...job) {
// ...
for _, val := range jobs {
// ...
go startWork(val, in, out)
}
}
Предотвращение закрытия нескольких каналов
У меня нет никаких гарантий, что out
будет закрыт в конце каждой функции
И наоборот, если какой-либо рабочий может закрыть канал, это проблематично;последующий вызов в startWork
для закрытия канала вызовет панику, если вы попытаетесь закрыть уже закрытый канал.
В этой простой реализации работники должны делегировать закрытие канала коду, который контролирует конвейер, чтобы избежатьваша программа для паники.
Обработка внутриполосной сигнализации
Поскольку сигнализация передается внутриполосной (в том же канале, что и данные), может потребоваться осторожность при реализацииваших сотрудников конвейера, чтобы убедиться, что они различают
- чтения значения из открытого канала и
- чтения нулевого значения из закрытого канала
range
по каналу в цикле for
автоматически прервет цикл при закрытии канала.Если вы реализуете свою собственную логику для чтения из канала, вам нужно будет определить, когда тривиальное успешное чтение выполняется с нулевым значением, потому что канал закрыт.Это может быть достигнуто с помощью многозначной формы назначения оператора приема , которая будет возвращать логическое значение, когда чтение из канала имело нулевое значение, поскольку канал был закрыт и пуст.
func someWorker(in, out chan interface{}) {
for {
val, open := <-in
if !open {
// Read of val was the zero value of "in", indicating the channel
// is closed.
break // or, if appropriate, return
}
}
}