Прежде чем рассмотреть возможность распараллеливания процесса, вам следует изучить входные данные и вычисления, чтобы убедиться, что они имеют смысл.
Входные данные, которые необходимо обработать по порядку, не являются хорошим соответствием, поскольку для параллельной обработки потребуетсядополнительные сложные инструкции, чтобы поддерживать порядок, сложно заранее оценить, будет ли эта стратегия выигрышной.
Кроме того, чтобы воспользоваться преимуществами распараллеливания, выполнение вычислений должно занимать больше времени, чем требуется длясинхронизировать параллельные задачи. Можно перевесить эту стоимость, увеличив объем данных, но полученный алгоритм будет более сложным и создаст дополнительные нежелательные побочные эффекты (например, распределения).
В противном случае не распараллеливайте.
См. Ниже пример различных реализаций с длительным / коротким временем вычислений и их результирующим эталоном.
Вывод таков, если вы не вычислитепри длительном выполнении асинхронной задачи, которая явно перевешивает затраты на синхронизацию, последовательная обработка выполняется быстрее.
main.go
package main
import (
"bufio"
"fmt"
"io"
"runtime"
"strings"
"sync"
"time"
)
func main() {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
run_line_short(data, true)
run_line_long(data, true)
run_line_short_workers(data, true)
run_line_long_workers(data, true)
run_bulk_short(data, true)
run_bulk_long(data, true)
run_seq_short(data, true)
run_seq_long(data, true)
}
func run_line_short(data string, stat bool) {
if stat {
s := stats("run_line_short")
defer s()
}
r := strings.NewReader(data)
err := process(r, line_handler_short)
if err != nil {
panic(err)
}
}
func run_line_long(data string, stat bool) {
if stat {
s := stats("run_line_long")
defer s()
}
r := strings.NewReader(data)
err := process(r, line_handler_long)
if err != nil {
panic(err)
}
}
func run_line_short_workers(data string, stat bool) {
if stat {
s := stats("run_line_short_workers")
defer s()
}
r := strings.NewReader(data)
err := processWorkers(r, line_handler_short)
if err != nil {
panic(err)
}
}
func run_line_long_workers(data string, stat bool) {
if stat {
s := stats("run_line_long_workers")
defer s()
}
r := strings.NewReader(data)
err := processWorkers(r, line_handler_long)
if err != nil {
panic(err)
}
}
func run_bulk_short(data string, stat bool) {
if stat {
s := stats("run_bulk_short")
defer s()
}
r := strings.NewReader(data)
err := processBulk(r, bulk_handler_short)
if err != nil {
panic(err)
}
}
func run_bulk_long(data string, stat bool) {
if stat {
s := stats("run_bulk_long")
defer s()
}
r := strings.NewReader(data)
err := processBulk(r, bulk_handler_long)
if err != nil {
panic(err)
}
}
func run_seq_short(data string, stat bool) {
if stat {
s := stats("run_seq_short")
defer s()
}
r := strings.NewReader(data)
err := processSeq(r, line_handler_short)
if err != nil {
panic(err)
}
}
func run_seq_long(data string, stat bool) {
if stat {
s := stats("run_seq_long")
defer s()
}
r := strings.NewReader(data)
err := processSeq(r, line_handler_long)
if err != nil {
panic(err)
}
}
func line_handler_short(k string) error {
_ = len(k)
return nil
}
func line_handler_long(k string) error {
<-time.After(time.Millisecond * 5)
_ = len(k)
return nil
}
func bulk_handler_short(b []string) error {
for _, k := range b {
_ = len(k)
}
return nil
}
func bulk_handler_long(b []string) error {
<-time.After(time.Millisecond * 5)
for _, k := range b {
_ = len(k)
}
return nil
}
func stats(name string) func() {
fmt.Printf("======================\n")
fmt.Printf("%v\n", name)
start := time.Now()
return func() {
fmt.Printf("time to run %v\n", time.Since(start))
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MB\n",
ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024)
fmt.Printf("Mallocs: %d, Frees: %d\n",
ms.Mallocs, ms.Frees)
fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MB\n",
ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024)
fmt.Printf("HeapObjects: %d\n", ms.HeapObjects)
fmt.Printf("\n")
}
}
func process(r io.Reader, h func(string) error) error {
errs := make(chan error)
workers := make(chan struct{}, 4)
var wg sync.WaitGroup
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
workers <- struct{}{} // acquire a token
wg.Add(1)
go func(line string) {
defer wg.Done()
if err := h(line); err != nil {
errs <- err
}
<-workers
}(scanner.Text())
}
wg.Wait()
if err := scanner.Err(); err != nil {
errs <- err
}
close(errs)
}()
var err error
for e := range errs {
if e != nil && err == nil {
err = e
}
}
return err
}
func processWorkers(r io.Reader, h func(string) error) error {
errs := make(chan error)
input := make(chan string)
y := 4
var wg sync.WaitGroup
for i := 0; i < y; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for line := range input {
if err := h(line); err != nil {
errs <- err
}
}
}()
}
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
input <- scanner.Text()
}
close(input)
wg.Wait()
if err := scanner.Err(); err != nil {
errs <- err
}
close(errs)
}()
var err error
for e := range errs {
if err == nil && e != nil {
err = e
}
}
return err
}
func processBulk(r io.Reader, h func([]string) error) error {
errs := make(chan error)
input := make(chan []string)
y := 4
var wg sync.WaitGroup
for i := 0; i < y; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for bulk := range input {
if err := h(bulk); err != nil {
errs <- err
}
}
}()
}
go func() {
scanner := bufio.NewScanner(r)
l := 50
bulk := make([]string, l)
i := 0
for scanner.Scan() {
text := scanner.Text()
bulk[i] = text
i++
if i == l {
copied := make([]string, l, l)
copy(copied, bulk)
i = 0
input <- copied
}
}
if len(bulk) > 0 {
input <- bulk
}
close(input)
if err := scanner.Err(); err != nil {
errs <- err
}
}()
go func() {
wg.Wait()
close(errs)
}()
var err error
for e := range errs {
if err == nil && e != nil {
err = e
}
}
return err
}
func processSeq(r io.Reader, h func(string) error) error {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
text := scanner.Text()
if err := h(text); err != nil {
return err
}
}
return scanner.Err()
}
main_test.go
package main
import (
"strings"
"testing"
)
func Benchmark_run_line_short(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_short(data, false)
}
}
func Benchmark_run_line_long(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_long(data, false)
}
}
func Benchmark_run_line_short_workers(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_short_workers(data, false)
}
}
func Benchmark_run_line_long_workers(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_long_workers(data, false)
}
}
func Benchmark_run_bulk_short(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_bulk_short(data, false)
}
}
func Benchmark_run_bulk_long(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_bulk_long(data, false)
}
}
func Benchmark_run_seq_short(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_seq_short(data, false)
}
}
func Benchmark_run_seq_long(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_seq_long(data, false)
}
}
результаты
$ go run main.go
======================
run_line_short
time to run 2.747827ms
Alloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MB
Mallocs: 1378, Frees: 1
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1377
======================
run_line_long
time to run 1.30987804s
Alloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MB
Mallocs: 5619, Frees: 5
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5614
======================
run_line_short_workers
time to run 4.54926ms
Alloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MB
Mallocs: 6648, Frees: 5743
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 905
======================
run_line_long_workers
time to run 1.29874118s
Alloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MB
Mallocs: 10670, Frees: 5747
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MB
HeapObjects: 4923
======================
run_bulk_short
time to run 1.279059ms
Alloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MB
Mallocs: 11695, Frees: 5751
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5944
======================
run_bulk_long
time to run 31.328652ms
Alloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MB
Mallocs: 12728, Frees: 11361
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1367
======================
run_seq_short
time to run 956.991µs
Alloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MB
Mallocs: 13746, Frees: 11160
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 2586
======================
run_seq_long
time to run 5.195705859s
Alloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MB
Mallocs: 17766, Frees: 15973
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1793
[mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4
goos: linux
goarch: amd64
pkg: test/bulk
Benchmark_run_line_short-4 1000 1750824 ns/op 1029354 B/op 1005 allocs/op
Benchmark_run_line_short-4 1000 1747408 ns/op 1029348 B/op 1005 allocs/op
Benchmark_run_line_short-4 1000 1757826 ns/op 1029352 B/op 1005 allocs/op
Benchmark_run_line_short-4 1000 1758427 ns/op 1029352 B/op 1005 allocs/op
Benchmark_run_line_long-4 1 1303037704 ns/op 2253776 B/op 4075 allocs/op
Benchmark_run_line_long-4 1 1305074974 ns/op 2247792 B/op 4032 allocs/op
Benchmark_run_line_long-4 1 1305353658 ns/op 2246320 B/op 4013 allocs/op
Benchmark_run_line_long-4 1 1305725817 ns/op 2247792 B/op 4031 allocs/op
Benchmark_run_line_short_workers-4 1000 2148354 ns/op 1029366 B/op 1005 allocs/op
Benchmark_run_line_short_workers-4 1000 2139629 ns/op 1029370 B/op 1005 allocs/op
Benchmark_run_line_short_workers-4 1000 1983352 ns/op 1029359 B/op 1005 allocs/op
Benchmark_run_line_short_workers-4 1000 1909968 ns/op 1029363 B/op 1005 allocs/op
Benchmark_run_line_long_workers-4 1 1298321093 ns/op 2247856 B/op 4013 allocs/op
Benchmark_run_line_long_workers-4 1 1299846127 ns/op 2246384 B/op 4012 allocs/op
Benchmark_run_line_long_workers-4 1 1300003625 ns/op 2246288 B/op 4011 allocs/op
Benchmark_run_line_long_workers-4 1 1302779911 ns/op 2246256 B/op 4011 allocs/op
Benchmark_run_bulk_short-4 2000 704358 ns/op 1082154 B/op 1011 allocs/op
Benchmark_run_bulk_short-4 2000 708563 ns/op 1082147 B/op 1011 allocs/op
Benchmark_run_bulk_short-4 2000 714687 ns/op 1082148 B/op 1011 allocs/op
Benchmark_run_bulk_short-4 2000 705546 ns/op 1082156 B/op 1011 allocs/op
Benchmark_run_bulk_long-4 50 31411412 ns/op 1051497 B/op 1088 allocs/op
Benchmark_run_bulk_long-4 50 31513018 ns/op 1051544 B/op 1088 allocs/op
Benchmark_run_bulk_long-4 50 31539311 ns/op 1051502 B/op 1088 allocs/op
Benchmark_run_bulk_long-4 50 31564940 ns/op 1051505 B/op 1088 allocs/op
Benchmark_run_seq_short-4 2000 574346 ns/op 1028632 B/op 1002 allocs/op
Benchmark_run_seq_short-4 3000 572857 ns/op 1028464 B/op 1002 allocs/op
Benchmark_run_seq_short-4 2000 580493 ns/op 1028632 B/op 1002 allocs/op
Benchmark_run_seq_short-4 3000 572240 ns/op 1028464 B/op 1002 allocs/op
Benchmark_run_seq_long-4 1 5196313302 ns/op 2245792 B/op 4005 allocs/op
Benchmark_run_seq_long-4 1 5199995649 ns/op 2245792 B/op 4005 allocs/op
Benchmark_run_seq_long-4 1 5200460425 ns/op 2245792 B/op 4005 allocs/op
Benchmark_run_seq_long-4 1 5201080570 ns/op 2245792 B/op 4005 allocs/op
PASS
ok test/bulk 68.944s
примечания: к моему удивлению, run_line_short_workers
немного медленнее, чем run_line_short
, я не объясняю этот результат,однако более глубокий анализ с использованием pprof должен дать ответ.