Как повысить производительность каналов для большого количества задач и быстрых воркеров - PullRequest
0 голосов
/ 20 июня 2020

Я хочу эффективно решить проблему, которая в принципе аналогична нахождению среднего попарного расстояния P точек в пространстве, что является примером, который я использую для этого вопроса. Вычисления можно хорошо распараллелить, поэтому я хотел решить это с помощью Go. В последовательной программе мне нужно было бы запустить два вложенных цикла: внешний - над i = 0...P-1, а внутренний - над j = i+1...P-1. Затем я бы вычислил расстояние между точками i и j, просуммировал их все и в конце разделил на количество пар точек. Следовательно, вычисления должны охватывать «треугольник» возможных комбинаций пар точек. . Мой подход выглядит следующим образом:

package main

import "math"
import "sync"
import "fmt"
import "math/rand"
import "github.com/schollz/progressbar"

const nProcs = 32
const nPoints = 30000

type Pair struct {
    p1 [3]float64
    p2 [3]float64
}

func square(f float64) (float64) {
    return f * f
}

func progress(total int64, ch <-chan int) {
    bar := progressbar.Default(total)
    for i := range ch {
        bar.Add(i)
    }
}

func worker(idx int, sumBuffer []float64, in <-chan Pair, out chan<- int, wg *sync.WaitGroup) {
    count := int64(0)
    
    defer wg.Done();
    
    for pair := range in {
        dist := math.Sqrt(square(pair.p1[0]-pair.p2[0]) + square(pair.p1[1]-pair.p2[1]) + square(pair.p1[2]-pair.p2[2]))
        sumBuffer[idx] += dist
        
        count++
        if count % (2<<15) == 0 {
            out <- (2<<15)
        }
    }
}

func main() {
    var sumBuffer [nProcs]float64
    var points    [nPoints][3]float64
    
    var sum float64
    
    for i := 0; i < nPoints; i++ {
        for j := 0; j < 3; j++ {
            points[i][j] = rand.Float64()
        }
    }
    
    wg := &sync.WaitGroup{};
    wg.Add(nProcs);
    
    progressCh := make(chan int, 4 * nProcs)
    pairCh := make(chan Pair, 4 * nProcs)
    nPairs := int64(nPoints - 1) * int64(nPoints) / int64(2)
    go progress(nPairs, progressCh)
    
    for i := 0; i < nProcs; i++ {
        go worker(i, sumBuffer[:], pairCh, progressCh, wg)
    }
    
    for i := int64(0); i < nPoints; i++ {
        for j := int64(i+1); j < nPoints; j++ {
            pairCh <- Pair{points[i], points[j]}
        }
    }
    
    close(pairCh)
    wg.Wait();
    
    for i := 0; i < nProcs; i++ {
        sum += sumBuffer[i]
    }
    sum /= float64(nPairs)
    
    fmt.Println("Average distance:", sum)
}

Однако эта программа не работала так быстро, как я ожидал. Во второй попытке я избавился от каналов для распределения задач и вместо этого вручную разделил вычисления между рабочими. Затем каждый рабочий должен сначала рассчитать, какую часть «треугольника» он должен покрыть, что довольно громоздко.

package main

import "math"
import "sync"
import "fmt"
import "math/rand"
import "github.com/schollz/progressbar"

const nProcs = 32
const nPoints = 30000

func square(f float64) (float64) {
    return f * f
}

func progress(total int64, ch <-chan int) {
    bar := progressbar.Default(total)
    for i := range ch {
        bar.Add(i)
    }
}

func worker(idx int, points [][3]float64, sumBuffer []float64, start int64, stop int64, out chan<- int, wg *sync.WaitGroup) {
    count := start
    length := int64(len(points))
    
    defer wg.Done();
    
    // Calculate start value for loop index i
    iStart := int64(0)
    pointCount := int64(0)
    for k := length - 1; k >= 0; k-- {
        if pointCount + k > start {
            break
        } else {
            pointCount += k
            iStart++
        }
    }
    firstLoop := true
        
    for i := int64(iStart); i < length && count < stop; i++ {
        // Calculate start value for loop index j
        var jStart int64
        if firstLoop {
            jStart = (i + 1) + (start - pointCount)
        } else {
            jStart = i + 1
        }
        
        for j := int64(jStart); j < length && count < stop; j++ {
            dist := math.Sqrt(square(points[i][0]-points[j][0]) + square(points[i][1]-points[j][1]) + square(points[i][2]-points[j][2]))
            sumBuffer[idx] += dist
            
            count++
            if count % (2<<15) == 0 {
                out <- (2<<15)
            }
        }
        
        firstLoop = false
    }
}

func main() {
    var sumBuffer [nProcs]float64
    var points    [nPoints][3]float64
    
    var sum float64
    
    for i := 0; i < nPoints; i++ {
        for j := 0; j < 3; j++ {
            points[i][j] = rand.Float64()
        }
    }
    
    wg := &sync.WaitGroup{};
    wg.Add(nProcs);
    
    progressCh := make(chan int, 4 * nProcs)
    nPairs := int64(nPoints - 1) * int64(nPoints) / int64(2)
    go progress(nPairs, progressCh)
    
    step := int64(math.Ceil(float64(nPairs) / float64(nProcs)))
    for i := 0; i < nProcs; i++ {
        go worker(i, points[:], sumBuffer[:], int64(i) * step, int64(i+1) * step, progressCh, wg)
    }

    wg.Wait();
    
    for i := 0; i < nProcs; i++ {
        sum += sumBuffer[i]
    }
    sum /= float64(nPairs)
    
    fmt.Println("Average distance:", sum)
}

Теперь вторая программа работает примерно в 100 раз быстрее! Однако я даже не использую преимущества Go в этой версии, и я мог бы написать ту же программу на C ++. Как можно улучшить первую программу так, чтобы накладные расходы, связанные с использованием каналов, не были столь значительными c? Или это просто предел эффективности каналов, и для моего варианта использования каналы просто не подходят для go?

Кроме того, я новичок в Go. Я уверен, что моя программа не совсем идеоматична c для Go. Приветствуются любые комментарии о том, как улучшить свой стиль.

1 Ответ

0 голосов
/ 22 июня 2020

Канальная версия, вероятно, медленнее, потому что работа (вычисление расстояния) очень мала по сравнению с накладными расходами на планирование и обмен данными между горутинами.

Лучше разделить пары точек на более крупные куски, как вы сделал во втором примере.

Вот упрощенный c подход, чтобы увидеть, что происходит:

var sum float64
var count int64

for i, p := range points {
    for _, p2 := range points[i+1:] {
        sum += p.distanceTo(p2)
        count++
    }
}

fmt.Printf("%d points, %d pairs, avg %f\n", len(points), count, sum/float64(count))

Когда я пробую это, мой ноутбук нагревается, а затем выводит:

100000 points, 4999950000 pairs, avg 0.660843
duration 22.623835855s

Каждое вычисление расстояния занимает около 5 наносекунд, это очень быстро и слишком мало для разделения работы.

Только одно ядро ​​процессора выполняет работу:

single cpu

Since my laptop has 8 cores, I tried to create an example with 8 workers working on equal chunks of the point pairs.

8 workers, 100000 points, 4999950000 pairs, avg 0.660843
duration 7.481476421s

It's not 8 x faster, but it's an improvement, and puts those idle cores to work:

enter image description here

Example:

package main

import (
    "fmt"
    "math"
    "math/rand"
    "runtime"
    "time"
)

const nPoints = 100000

var points [nPoints]*Point

type Point struct {
    x, y, z float64
}

func (p *Point) distanceTo(p2 *Point) float64 {
    a := p.x - p2.x
    b := p.y - p2.y
    c := p.z - p2.z
    return math.Sqrt(a*a + b*b + c*c)
}

func main() {
    initPoints()

    start := time.Now()
    run()
    fmt.Printf("duration %v\n", time.Since(start))
}

func initPoints() {
    for i := 0; i < nPoints; i++ {
        points[i] = &Point{rand.Float64(), rand.Float64(), rand.Float64()}
    }
}

func run() {
    requests := make(chan Request)
    results := make(chan Result)

    nWorkers := runtime.NumCPU() //change, eg: 1 or 2

    for n := 0; n < nWorkers; n++ {
        go worker(requests, results)
    }

    go func() {
        nPairs := nPoints * (nPoints - 1) / 2
        batchSize := nPairs / nWorkers

        req := Request{iStart: 0, jStart: 1, count: 0}

        for i := 0; i < nPoints; i++ {
            for j := i + 1; j < nPoints; j++ {
                req.count++
                if req.count == batchSize {
                    requests <- req
                    req = Request{iStart: i, jStart: j, count: 0}
                }
            }
        }

        if req.count > 0 {
            requests <- req
        }

        close(requests)
    }()

    var sum, avg float64
    var count int64

    for n := 0; n < nWorkers; n++ {
        r := <-results
        fmt.Printf("worker %d: %v\n", n+1, r)
        sum += r.sum
        count += r.count
    }
    close(results)

    if count > 0 {
        avg = sum / float64(count)
    }

    fmt.Printf("%d workers, %d points, %d pairs, avg %f\n", nWorkers, len(points), count, avg)
}

func worker(requests chan Request, results chan Result) {
    r := Result{}
    for req := range requests {
        req.work(&r)
    }

    results <- r
}

func (req *Request) work(result *Result) {
    count := 0
    jStart := req.jStart

    for i := req.iStart; i < nPoints; i++ {
        p := points[i]

        if i > req.iStart {
            jStart = i + 1
        }

        for j := jStart; j < nPoints; j++ {
            p2 := points[j]

            result.sum += p.distanceTo(p2)
            result.count++

            count++
            if count == req.count {
                return
            }
        }
    }
}

type Request struct {
    iStart int
    jStart int
    count  int
}

type Result struct {
    sum   float64
    count int64
}

func (r Result) String() string {
    avg := 0.0
    if r.count > 0 {
        avg = r.sum / float64(r.count)
    }
    return fmt.Sprintf("%d comparisons, avg distance %f", r.count, avg)
}

The Сообщение в блоге pprof profiling - отличная демонстрация методов, позволяющих увидеть, на что программа тратит свое время.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...