Синхронизация для нескольких программ с использованием каналов - PullRequest
0 голосов
/ 13 мая 2018

Мне нужно запустить несколько рабочих с одной очередью задач и одной очередью результатов.Каждый работник должен быть запущен в разные рутины.И мне нужно подождать, пока все рабочие не будут закончены, и очередь задач будет пуста, прежде чем выходить из программы.Я подготовил небольшой пример для синхронизации goroutine.Основная идея заключалась в том, что мы просчитываем задачи в очереди и ждем, пока все рабочие закончат работуНо текущая реализация иногда пропускает ценности.Почему это происходит и как решить проблему?Пример кода:

import (
    "fmt"
    "os"
    "os/signal"
    "strconv"
)

const num_workers = 5

type workerChannel chan uint64

// Make channel for tasks
var workCh workerChannel
// Make channel for task counter
var cntChannel chan int

// Task counter
var tskCnt int64

// Worker function
func InitWorker(input workerChannel, result chan string, num int) {
    for {
        select {
        case inp := <-input:
            getTask()
            result <- ("Worker " + strconv.Itoa(num) + ":" + strconv.FormatUint(inp, 10))
        }
    }
}

// Function to manage task counter
// should be in uniq goroutine
func taskCounter(inp chan int) {
    for {
        val := <-inp
        tskCnt += int64(val)
    }
}

// Put pask to the queue
func putTask(val uint64) {
    func() {
        fmt.Println("Put ", val)
        cntChannel <- int(1)
        workCh <- val
    }()
}

// Get task from queue
func getTask() {
    func() {
        cntChannel <- int(-1)
    }()
}

func main() {
// Init service channels
    abort := make(chan os.Signal)
    done := make(chan bool)

// init queue for results
    result := make(chan string)

// init task queue
    workCh = make(workerChannel)

// start some workers
    for i := uint(0); i < num_workers; i++ {
        go InitWorker(workCh, result, int(i))
    }

// init counter for synchro
    cntChannel = make(chan int)
    go taskCounter(cntChannel)

// goroutine that put some tasks into queue
    go func() {
        for i := uint(0); i < 21; i++ {
            putTask(uint64(i))
        }

        // wait for processing all tasks and close application
        for len(cntChannel) != 0 {}
        for tskCnt != 0 {}
        for len(workCh) != 0 {}
        for len(result) != 0 {}

        // send signal for close
        done <- true
    }()

    signal.Notify(abort, os.Interrupt)
    for {
        select {
        case <-abort:
            fmt.Println("Aborted.")
            os.Exit(0)

        // print results
        case res := <-result:
            fmt.Println(res)

        case <-done:
            fmt.Println("Done")
            os.Exit(0)
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 13 мая 2018

Я предлагаю использовать close (chan) для такого рода задач.

Версия WaitGroup.

package main

import (
    "log"
    "sync"
)

func worker(in chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := range in {
        log.Println(i)
    }

}

func main() {
    in := make(chan int)
    lc := 25
    maxValue := 30
    wg := sync.WaitGroup{}
    wg.Add(lc)
    for i := 0; i < lc; i++ {
        go worker(in, &wg)
    }

    for c := 0; c <= maxValue; c++ {
        in <- c
    }
    close(in)
    wg.Wait()
}

Версия канала

package main

import (
    "log"
    "os"
)

func worker(in chan int, end chan struct{}) {
    defer func() { end <- struct{}{} }()
    for i := range in {
        log.Println(i)
    }

}

func main() {
    in := make(chan int)
    lc := 25
    maxValue := 30
    end := make(chan struct{})
    var fin int
    go func() {
        for {
            <-end
            fin++
            log.Println(`fin`, fin)
            if fin == lc {
                break
            }
        }
        close(end)
        os.Exit(0)
    }()
    for i := 0; i < lc; i++ {
        go worker(in, end)
    }

    for c := 0; c <= maxValue; c++ {
        in <- c
    }
    close(in)
    <-make(chan struct{})
}
0 голосов
/ 13 мая 2018

Используйте sync.WaitGroup , чтобы дождаться завершения выполнения процедур.Закройте каналы, чтобы вызвать чтение циклов на каналах.

package main

import (
    "fmt"
    "sync"
)

type workerChannel chan uint64

const num_workers = 5

func main() {

    results := make(chan string)
    workCh := make(workerChannel)

    // Start workers
    var wg sync.WaitGroup
    wg.Add(num_workers)
    for i := 0; i < num_workers; i++ {
        go func(num int) {
            defer wg.Done()
            // Loop processing work until workCh is closed
            for w := range workCh {
                results <- fmt.Sprintf("worker %d, task %d", num, w)
            }

        }(i)
    }

    // Close result channel when workers are done
    go func() {
        wg.Wait()
        close(results)
    }()

    // Send work to be done
    go func() {
        for i := 0; i < 21; i++ {
            workCh <- uint64(i)
        }
        // Closing the channel causes workers to break out of loop
        close(workCh)
    }()

    // Process results. Loop exits when result channel is closed.
    for r := range results {
        fmt.Println(r)
    }
}

https://play.golang.org/p/ZifpzsP6fNv

...