go процедура ожидает ответа от канала и продолжает - PullRequest
0 голосов
/ 08 мая 2020

Я изучаю go параллелизм, и я хотел реализовать простой пример, который берет строки из матрицы и добавляет массив (срез) значений в каждую строку.

Поскольку я использую каналы, я пытаюсь подождать, пока каждая строка получит соответствующий результат от горутины. Однако это не лучше, чем просто делать это синхронно. Как заставить каждую строку ждать своего соответствующего результата и позволить другим строкам вычислять свои результаты одновременно?

https://play.golang.org/p/uCOGwOBeIQL


package main

import "fmt"


/*
Array:
0 1 2 3 4 5 6 7 8 9

+

Matrix:
1 0 0 0 0 0 0 0 0 0
0 1 0 0 0 0 0 0 0 0
0 0 1 0 0 0 0 0 0 0
0 0 0 1 0 0 0 0 0 0
0 0 0 0 1 0 0 0 0 0
0 0 0 0 0 1 0 0 0 0
0 0 0 0 0 0 1 0 0 0
0 0 0 0 0 0 0 1 0 0
0 0 0 0 0 0 0 0 1 0
0 0 0 0 0 0 0 0 0 1

-> 
Expected result:
1 1 2 3 4 5 6 7 8 9
0 2 2 3 4 5 6 7 8 9
0 1 3 3 4 5 6 7 8 9
0 1 2 4 4 5 6 7 8 9
0 1 2 3 5 5 6 7 8 9
0 1 2 3 4 6 6 7 8 9
0 1 2 3 4 5 7 7 8 9
0 1 2 3 4 5 6 8 8 9
0 1 2 3 4 5 6 7 9 9
0 1 2 3 4 5 6 7 8 10
*/
func main() {
    numbers := []int {0,1,2,3,4,5,6,7,8,9}

    matrix := [][]int{
        {1,0,0,0,0,0,0,0,0,0},
        {0,1,0,0,0,0,0,0,0,0},
        {0,0,1,0,0,0,0,0,0,0},
        {0,0,0,1,0,0,0,0,0,0},
        {0,0,0,0,1,0,0,0,0,0},
        {0,0,0,0,0,1,0,0,0,0},
        {0,0,0,0,0,0,1,0,0,0},
        {0,0,0,0,0,0,0,1,0,0},
        {0,0,0,0,0,0,0,0,1,0},
        {0,0,0,0,0,0,0,0,0,1},
    }

    rmatrix := make([][]int, 10)

    for i, row := range matrix {
        cResult := make(chan []int)
        go func(row []int, numbers []int, c chan <- []int) {
            c <- addRow(row,numbers)
        }(row,numbers,cResult)

        //this read from the channel will block until the goroutine sends its result over the channel
        rmatrix[i] = <- cResult
    }
    fmt.Println(rmatrix)
}

func addRow(row []int, numbers []int) []int{
    result := make([]int, len(row))
    for i,e := range row {
        result[i] = e + numbers[i];
    }
    return result
}

Ответы [ 3 ]

0 голосов
/ 08 мая 2020

Этот пример порождает меньшее количество горутин, а также гарантирует правильный порядок, независимо от того, какая горутина завершила свою обработку первой.

package main

import (
    "fmt"
    "sync"
)

type rowRes struct {
    index  int
    result *[]int
}

func addRow(index int, row []int, numbers []int) rowRes {
    result := make([]int, len(row))
    for i, e := range row {
        result[i] = e + numbers[i]
    }
    return rowRes{
        index:  index,
        result: &result,
    }
}

func main() {
    numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

    matrix := [][]int{
        {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
    }
    rmatrix := make([][]int, 10)

    // Buffered channel
    rowChan := make(chan rowRes, 10)

    wg := sync.WaitGroup{}

    // Reciever goroutine
    go recv(rowChan, rmatrix)

    for i := range matrix {
        wg.Add(1)
        go func(index int, row []int, w *sync.WaitGroup) {
            rowChan <- addRow(index, row, numbers)
            w.Done()
        }(i, matrix[i], &wg)
    }
    wg.Wait()
    close(rowChan)
    fmt.Println(rmatrix)
}

func recv(res chan rowRes, rmatrix [][]int) {
    for {
        select {
        case k, ok := <-res:
            if !ok {
                return
            }
            rmatrix[k.index] = *k.result
        }
    }
}
0 голосов
/ 08 мая 2020

конвейерный метод


taskChannel := make(chan string,1000); // Set up the task queue
wg := sync.WaitGroup

// Task release
wg.add(1)
go func(&wg,taskChannel) {
      defer wg.Down()
      for i in "task list" {
        taskChannel <- "Stuff the characters you want to deal with here"
      }

    // After the task is sent and closed
    close(taskChannel)
}(wg *sync.WaitGroup,taskChannel chan string)

// Task execution
go func(&wg,taskChannel,1000) {
    defer wg.Down()
    limit := make(chan bool,limitNumber); // Limit the number of concurrent
    tg := sync.WaitGroup
    loop:
    for {
      select {
      case task,over := <-taskChannel:
            if !over {  // If there are no more tasks, quit
                tg.Wait()  // Wait for all tasks to end
                break loop
            }

            tg.Add(1)
            limit<-true
            go func(&tg,limitm) {
                defer func() {
                    <-limit
                    tg.Down()
                }
                // Business processing logic, processing tasks
            }(tg *sync.WaitGroup,limit chan bool,task string)
      }
    }
}(wg *sync.WaitGroup,taskChannel chan string,limitNumber int)

wg.Wait()

Надеюсь вам помочь

0 голосов
/ 08 мая 2020

Мне нужно было использовать sync.WaitGroup и напрямую назначать результаты вызова (чтобы гарантировать, что они go вернутся в свою индексированную строку). Спасибо @ Питер

package main

import (
    "fmt"
    "sync"
)

func main() {
    numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

    matrix := [][]int{
        {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
        {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
    }

    rmatrix := make([][]int, 10)
    var waitGroup sync.WaitGroup

    for i, row := range matrix {
        waitGroup.Add(1)
        go func(i int, row []int) {
            rmatrix[i] = addRow(row, numbers)
            waitGroup.Done()
        }(i, row)
    }
    waitGroup.Wait()
    fmt.Println(rmatrix)
}

func addRow(row []int, numbers []int) []int {
    result := make([]int, len(row))
    for i, e := range row {
        result[i] = e + numbers[i]
    }
    return result
}

...