Как реализовать параллельную обработку строкового содержимого файла - PullRequest
0 голосов
/ 30 сентября 2019

Я пишу POC для обработки очень большого текстового файла ~ 1 млрд. + Строк и экспериментирую с Go для этого;

package main

import (
        "bufio"
        "fmt"
        "log"
        "os"
        "time"
)

func main() {
        start := time.Now()
        file, err := os.Open("dump10.txt")
        if err != nil {
                log.Fatal(err)
        }
        defer file.Close()

        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
                go fmt.Println(scanner.Text())
        }

        if err := scanner.Err(); err != nil {
                log.Fatal(err)
        }
        secs := time.Since(start).Seconds()
        fmt.Printf("Took %.2fs", secs)
}

Однако при запуске этого я получаю эту ошибку;

паника: слишком много одновременных операций с одним файлом или сокетом (макс. 1048575)

Я не нашел в сети ничего такого, что могло бы решить эту конкретную ошибку. Я не уверен, что это проблема с файловыми дескрипторами. Максимальное значение, указанное в ошибке, намного превышает мой ulimit -n предел в 500 000

Как лучше всего это сделать?

Поскольку это не очевидно, fmt.Println является заменой для фактической функции, которую я буду вызывать при обработке данных.

Ответы [ 3 ]

2 голосов
/ 30 сентября 2019

Прежде чем рассмотреть возможность распараллеливания процесса, вам следует изучить входные данные и вычисления, чтобы убедиться, что они имеют смысл.

Входные данные, которые необходимо обработать по порядку, не являются хорошим соответствием, поскольку для параллельной обработки потребуетсядополнительные сложные инструкции, чтобы поддерживать порядок, сложно заранее оценить, будет ли эта стратегия выигрышной.

Кроме того, чтобы воспользоваться преимуществами распараллеливания, выполнение вычислений должно занимать больше времени, чем требуется длясинхронизировать параллельные задачи. Можно перевесить эту стоимость, увеличив объем данных, но полученный алгоритм будет более сложным и создаст дополнительные нежелательные побочные эффекты (например, распределения).

В противном случае не распараллеливайте.

См. Ниже пример различных реализаций с длительным / коротким временем вычислений и их результирующим эталоном.

Вывод таков, если вы не вычислитепри длительном выполнении асинхронной задачи, которая явно перевешивает затраты на синхронизацию, последовательная обработка выполняется быстрее.

main.go

package main

import (
    "bufio"
    "fmt"
    "io"
    "runtime"
    "strings"
    "sync"
    "time"
)

func main() {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    run_line_short(data, true)
    run_line_long(data, true)
    run_line_short_workers(data, true)
    run_line_long_workers(data, true)
    run_bulk_short(data, true)
    run_bulk_long(data, true)
    run_seq_short(data, true)
    run_seq_long(data, true)
}

func run_line_short(data string, stat bool) {
    if stat {
        s := stats("run_line_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long(data string, stat bool) {
    if stat {
        s := stats("run_line_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_line_short_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_short_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_long_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_bulk_short(data string, stat bool) {
    if stat {
        s := stats("run_bulk_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_bulk_long(data string, stat bool) {
    if stat {
        s := stats("run_bulk_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_seq_short(data string, stat bool) {
    if stat {
        s := stats("run_seq_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_seq_long(data string, stat bool) {
    if stat {
        s := stats("run_seq_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}

func line_handler_short(k string) error {
    _ = len(k)
    return nil
}

func line_handler_long(k string) error {
    <-time.After(time.Millisecond * 5)
    _ = len(k)
    return nil
}

func bulk_handler_short(b []string) error {
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func bulk_handler_long(b []string) error {
    <-time.After(time.Millisecond * 5)
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func stats(name string) func() {
    fmt.Printf("======================\n")
    fmt.Printf("%v\n", name)
    start := time.Now()
    return func() {
        fmt.Printf("time to run %v\n", time.Since(start))
        var ms runtime.MemStats
        runtime.ReadMemStats(&ms)
        fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MB\n",
            ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024)
        fmt.Printf("Mallocs: %d, Frees: %d\n",
            ms.Mallocs, ms.Frees)
        fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MB\n",
            ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024)
        fmt.Printf("HeapObjects: %d\n", ms.HeapObjects)
        fmt.Printf("\n")
    }
}

func process(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    workers := make(chan struct{}, 4)
    var wg sync.WaitGroup
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            workers <- struct{}{} // acquire a token
            wg.Add(1)
            go func(line string) {
                defer wg.Done()
                if err := h(line); err != nil {
                    errs <- err
                }
                <-workers
            }(scanner.Text())
        }
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if e != nil && err == nil {
            err = e
        }
    }
    return err
}

func processWorkers(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    input := make(chan string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for line := range input {
                if err := h(line); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            input <- scanner.Text()
        }
        close(input)
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processBulk(r io.Reader, h func([]string) error) error {
    errs := make(chan error)
    input := make(chan []string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for bulk := range input {
                if err := h(bulk); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        l := 50
        bulk := make([]string, l)
        i := 0
        for scanner.Scan() {
            text := scanner.Text()
            bulk[i] = text
            i++
            if i == l {
                copied := make([]string, l, l)
                copy(copied, bulk)
                i = 0
                input <- copied
            }
        }
        if len(bulk) > 0 {
            input <- bulk
        }
        close(input)
        if err := scanner.Err(); err != nil {
            errs <- err
        }
    }()
    go func() {
        wg.Wait()
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processSeq(r io.Reader, h func(string) error) error {
    scanner := bufio.NewScanner(r)
    for scanner.Scan() {
        text := scanner.Text()
        if err := h(text); err != nil {
            return err
        }
    }
    return scanner.Err()
}

main_test.go

package main

import (
    "strings"
    "testing"
)

func Benchmark_run_line_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short(data, false)
    }
}

func Benchmark_run_line_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long(data, false)
    }
}
func Benchmark_run_line_short_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short_workers(data, false)
    }
}
func Benchmark_run_line_long_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long_workers(data, false)
    }
}
func Benchmark_run_bulk_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_short(data, false)
    }
}
func Benchmark_run_bulk_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_long(data, false)
    }
}
func Benchmark_run_seq_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_short(data, false)
    }
}
func Benchmark_run_seq_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_long(data, false)
    }
}

результаты

$ go run main.go 
======================
run_line_short
time to run 2.747827ms
Alloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MB
Mallocs: 1378, Frees: 1
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1377

======================
run_line_long
time to run 1.30987804s
Alloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MB
Mallocs: 5619, Frees: 5
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5614

======================
run_line_short_workers
time to run 4.54926ms
Alloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MB
Mallocs: 6648, Frees: 5743
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 905

======================
run_line_long_workers
time to run 1.29874118s
Alloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MB
Mallocs: 10670, Frees: 5747
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MB
HeapObjects: 4923

======================
run_bulk_short
time to run 1.279059ms
Alloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MB
Mallocs: 11695, Frees: 5751
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5944

======================
run_bulk_long
time to run 31.328652ms
Alloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MB
Mallocs: 12728, Frees: 11361
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1367

======================
run_seq_short
time to run 956.991µs
Alloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MB
Mallocs: 13746, Frees: 11160
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 2586

======================
run_seq_long
time to run 5.195705859s
Alloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MB
Mallocs: 17766, Frees: 15973
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1793

[mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4
goos: linux
goarch: amd64
pkg: test/bulk
Benchmark_run_line_short-4                  1000       1750824 ns/op     1029354 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1747408 ns/op     1029348 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1757826 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1758427 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_long-4                      1    1303037704 ns/op     2253776 B/op       4075 allocs/op
Benchmark_run_line_long-4                      1    1305074974 ns/op     2247792 B/op       4032 allocs/op
Benchmark_run_line_long-4                      1    1305353658 ns/op     2246320 B/op       4013 allocs/op
Benchmark_run_line_long-4                      1    1305725817 ns/op     2247792 B/op       4031 allocs/op
Benchmark_run_line_short_workers-4          1000       2148354 ns/op     1029366 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       2139629 ns/op     1029370 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1983352 ns/op     1029359 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1909968 ns/op     1029363 B/op       1005 allocs/op
Benchmark_run_line_long_workers-4              1    1298321093 ns/op     2247856 B/op       4013 allocs/op
Benchmark_run_line_long_workers-4              1    1299846127 ns/op     2246384 B/op       4012 allocs/op
Benchmark_run_line_long_workers-4              1    1300003625 ns/op     2246288 B/op       4011 allocs/op
Benchmark_run_line_long_workers-4              1    1302779911 ns/op     2246256 B/op       4011 allocs/op
Benchmark_run_bulk_short-4                  2000        704358 ns/op     1082154 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        708563 ns/op     1082147 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        714687 ns/op     1082148 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        705546 ns/op     1082156 B/op       1011 allocs/op
Benchmark_run_bulk_long-4                     50      31411412 ns/op     1051497 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31513018 ns/op     1051544 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31539311 ns/op     1051502 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31564940 ns/op     1051505 B/op       1088 allocs/op
Benchmark_run_seq_short-4                   2000        574346 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572857 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   2000        580493 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572240 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_long-4                       1    5196313302 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5199995649 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5200460425 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5201080570 ns/op     2245792 B/op       4005 allocs/op
PASS
ok      test/bulk   68.944s

примечания: к моему удивлению, run_line_short_workers немного медленнее, чем run_line_short, я не объясняю этот результат,однако более глубокий анализ с использованием pprof должен дать ответ.

1 голос
/ 30 сентября 2019

В настоящее время все, что делает ваш пример, это асинхронная печать значений - и даже не это, поскольку функция печати должна синхронизировать печать с выводом.

В то же время вы не печатаете все строки вфайл. Основная рутина запускает много горутинов, но не дожидается их окончания. Некоторые будут бежать, а некоторые нет. Чтобы дождаться окончания процедуры, используйте sync.WaitGroup.

Вот пример. Это также может решить проблему с дескриптором файла.

    wg := &sync.WaitGroup{}
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
            text := scanner.Text()
            wg.Add(1)
            go func(t string) {
                    fmt.Println(t)
                    wg.Done()
            }(text)
    }

    wg.Wait()

Обратите внимание, что строки не будут обработаны по порядку! Если вам нужно обработать их по порядку, но не хотите обрабатывать их в чтении goroutine, вам нужен канал и одна goroutine для их обработки.

0 голосов
/ 30 сентября 2019

Это помогло с ошибкой слишком большого числа одновременных операций. Я использую канал, чтобы ограничить число одновременных операций чтения до 100000.

package main

import (
        "bufio"
        "fmt"
        "log"
        "os"
        "time"
)

var tokens = make(chan struct{}, 100000)

func main() {
        start := time.Now()
        file, err := os.Open("dump10.txt")
        if err != nil {
                log.Fatal(err)
        }
        defer file.Close()

        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
                tokens <- struct{}{} // acquire a token
                text := scanner.Text()
                go func () {
                    fmt.Printf("%s\n",text)
                    <-tokens
                }()
        }

        if err := scanner.Err(); err != nil {
                log.Fatal(err)
        }
        secs := time.Since(start).Seconds()
        fmt.Printf("Took %.2fs", secs)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...