Вот канальная версия вашего кода, функционально эквивалентная намерению примера выше. Ключевым моментом является то, что мы не используем какие-либо значения атома c для изменения логики c кода, потому что это не обеспечивает синхронизацию между программами. Все взаимодействия между программами синхронизируются с использованием каналов sync.WaitGroup
или context.Context
. Вероятно, есть более эффективные способы решения этой проблемы, но это демонстрирует, что нет никакой атомарности, необходимой для координации очереди и рабочих.
Единственное значение, которое все еще остается несогласованным между процедурами, здесь - это использование len(jobs)
в логе вывода. Независимо от того, имеет ли смысл его использовать или нет, зависит от вас, поскольку его значение не имеет смысла в параллельном мире, но оно безопасно , поскольку оно синхронизировано для одновременного использования и отсутствует логика c, основанная на значение.
buf := 5
workers := 3
jobs := make(chan int, buf)
// results buffer must always be larger than workers + buf to prevent deadlock
results := make(chan int, buf*2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start workers
var wg sync.WaitGroup
for n := 0; n < workers; n++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for jobID := range jobs {
fmt.Printf("worker %v processing %v - %v jobs left\n", n, jobID, len(jobs))
time.Sleep(time.Duration(rand.Intn(5)) * pollInterval)
results <- jobID
}
fmt.Printf("worker %v exited", n)
}(n)
}
var done sync.WaitGroup
done.Add(1)
go func() {
defer done.Done()
ticker := time.NewTicker(pollInterval)
r := make([]string, 0)
flushResults := func() {
fmt.Printf("===> results: %v\n", strings.Join(r, ","))
r = r[:0]
}
for {
select {
case <-ticker.C:
flushResults()
// send max buf jobs, or fill the queue
for i := 0; i < buf; i++ {
jobID++
select {
case jobs <- jobID:
continue
}
break
}
fmt.Printf("===> send %v jobs\n", i)
case jobID := <-results:
r = append(r, fmt.Sprintf("%v", jobID))
case <-ctx.Done():
// Close jobs channel to stop workers
close(jobs)
// Wait for workers to exit
wg.Wait()
// we can close results for easy iteration because we know
// there are no more workers.
close(results)
// Flush remaining results
for jobID := range results {
r = append(r, fmt.Sprintf("%v", jobID))
}
flushResults()
return
}
}
}()