Сериализация goroutines (распараллелить, но гарантировать порядок) - PullRequest
0 голосов
/ 03 июля 2018

Допустим, мы хотим обрабатывать некоторые вычисления параллельно, но мы должны гарантировать, что порядок результатов такой же, как порядок вычислений:

Это можно сделать, например:

https://play.golang.org/p/jQbo0EVLzvX

package main

import (
    "fmt"
    "time"
)

func main() {
    orderPutChans := make([]chan bool, 8)
    orderGetChans := make([]chan bool, 8)
    doneChans := make([]chan bool, 8)

    for i := 0; i < 8; i++ {
        orderPutChans[i] = make(chan bool, 1)
        orderGetChans[i] = make(chan bool)
        doneChans[i] = make(chan bool)
    }

    srcCh := make(chan int)
    dstCh := make(chan int)

    for i := 0; i < 8; i++ {
        go func(j int) {
            myGetCh := orderGetChans[j]
            nextGetCh := orderGetChans[(j+1) % 8]
            myPutCh := orderPutChans[j]
            nextPutCh := orderPutChans[(j+1) % 8]

            for {
                _ = <- myGetCh

                v, ok := <- srcCh

                if !ok {
                    k := (j + 1) % 8
                    if orderGetChans[k] != nil {
                            orderGetChans[k] <- true
                    }
                    orderGetChans[j] = nil

                    break
                }

                nextGetCh <- true

                time.Sleep(1000)

                v *= v

                _ = <- myPutCh

                dstCh <- v

                nextPutCh <- true
            }

            doneChans[j] <- true
        }(i)
    }

    go func() {
        for i := 0; i < 8; i++ {
            _ = <- doneChans[i]
        }
        close(dstCh)
    }()

    orderGetChans[0] <- true
    orderPutChans[0] <- true

    go func() {
        for i := 0; i < 100; i++ {
            srcCh <- i
        }
        close(srcCh)
    }()

    for vv := range dstCh {
        fmt.Println(vv)
    }
}

Можно использовать каналы для передачи прав на чтение / запись для каналов. Код грязный и выглядит не очень аккуратно. Есть ли более чистый способ достичь этого в Go?

Редактировать : Я не прошу «простых» замен, таких как chan struct{} или close на doneChans в пользу doneChans[i] <- true.

Edit2

Гораздо более простой подход (по крайней мере, с точки зрения кода) состоит в том, чтобы иметь массив results, и потребитель отправляет данные вместе с индексом (который будет представлять собой число работников) и записывает goroutines результат в results[j], а затем есть группа WaitGroup, которая будет ждать, пока все не будет сделано (с одним пакетом из множества пакетов), а затем перебрать результаты и отправить их в канал назначения. (Может быть, не так хорошо из-за ложного обмена?)

1 Ответ

0 голосов
/ 03 июля 2018

Если я правильно понимаю, это версия вашего кода, которая использует стиль "конвейер". Где есть несколько шагов в конвейере:

  1. Отправка значений src
  2. Рабочие, которые работают с полученными значениями src, отправляют на свой собственный канал результатов
  3. Объединение фрагментов каналов результатов от рабочих в один неупорядоченный канал
  4. Заказ неупорядоченных значений из неупорядоченного объединенного канала

Вот код, он использует стиль индексированных пар, который вы упомянули в редактировании вашего исходного вопроса.

type idxPair struct {
    idx, val int
}

func main() {
    // add a done channel, an ability to stop the world by closing this.
    done := make(chan struct{})
    defer close(done)

    // create srcChan, this will be where the values go into the pipeline
    srcCh := make(chan idxPair)

    // create a slice of result channels, one for each of the go workers
    const numWorkers = 8
    resChans := make([]<-chan idxPair, numWorkers)

    // waitgroup to wait for all the workers to stop
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    // start the workers, passing them each the src channel,
    // collecting the result channels they return
    for i := 0; i < numWorkers; i++ {
        resChans[i]  = worker(done, &wg, srcCh)
    }

    // start a single goroutine to send values into the pipeline
    // all values are sent with an index, to be pieces back into order at the end.
    go func() {
        defer close(srcCh)
        for i := 1; i < 100; i++ {
            srcCh <- idxPair{idx: i, val: i}
        }
    }()

    // merge all the results channels into a single results channel
    // this channel is unordered.
    mergedCh := merge(done, resChans...)

    // order the values coming from the mergedCh according the the idxPair.idx field.
    orderedResults := order(100, mergedCh)

    // iterate over each of the ordered results
    for _, v := range orderedResults {
        fmt.Println(v)
    }
}

func order(len int, res <-chan idxPair) []int {
    results := make([]int, len)

    // collect all the values to order them
    for r := range res {
        results[r.idx] = r.val
    }

    return results
}

func worker(done <- chan struct{}, wg *sync.WaitGroup, src <-chan idxPair) <-chan idxPair {
    res := make(chan idxPair)

    go func() {
        defer wg.Done()
        defer close(res)
        sendValue := func(pair idxPair) {
            v := pair.val
            v *= v
            ip := idxPair{idx: pair.idx, val: v}
            select {
            case res <- ip:
            case <-done:
            }
        }

        for v := range src{
             sendValue(v)
        }
    }()

    return res
}


// example and explanation here: https://blog.golang.org/pipelines
func merge(done <-chan struct{}, cs ...<-chan idxPair) <-chan idxPair {
    var wg sync.WaitGroup
    out := make(chan idxPair)

    output := func(c <-chan idxPair) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Причина, по которой я считаю, что это немного чище, а не просто "отличается ради", заключается в том, что:

  1. Вы можете моделировать и реализовывать каждый из этапов независимо. Этап order может быть легко оптимизирован для отправки значений через канал, когда они получены и т. Д.
  2. Это гораздо сложнее; вместо одного большого метода, который работает с несколькими каналами, хранящимися в массивах, вы можете выполнять асинхронную работу над элементами и оставлять порядок как что-то, что лишает ответственности. Это способствует повторному использованию.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...