Программа Голанга с утечкой памяти - PullRequest
0 голосов
/ 05 мая 2018

Я пытаюсь написать простую программу, которая ведет себя как find | Греп в Голанге. У меня есть программа, все работающие с использованием goroutines, используя следующий шаблон:

goroutine (filech <- каждый файл как найден) goroutine (хранить файлы в расширенных категориях <- grepch) </p>

порядок для каждого файла filech (grepch <- если файл содержит строку) </p>

Это все работает, как и ожидалось, но когда представлено большое количество файлов, память просто продолжает расти и расти. Я изучил некоторые инструменты профилирования, предлагаемые Go, но не мог понять, как найти утечку памяти. Я могу сказать, что память используется в основном bytes.makeSlice.

Может кто-нибудь взглянуть на код ниже и увидеть, что я делаю не так? Кроме того, я хотел бы знать, что не так с моим кодом, но я также хотел бы узнать, как отладить это самостоятельно в будущем, поэтому, если бы вы могли дать подробные инструкции по профилированию для такой проблемы, как эта, это было бы с благодарностью.

package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "regexp"
    "runtime/pprof"
    "strings"
    "sync"
)

var (
    topDir      string
    cProf       bool
    mProf       bool
    cProfFile   *os.File
    mProfFile   *os.File
    fileNames   []string
    fileTypes   []string
    fileLists   map[string][]string
    cMatch      = regexp.MustCompile(`(?i)^.*\.(?:c|h|cc|cpp|c\+\+|hpp)$`)
    javaMatch   = regexp.MustCompile(`(?i)^.*\.(?:java|js)$`)
    goMatch     = regexp.MustCompile(`(?i)^.*\.(?:go)$`)
    buildMatch  = regexp.MustCompile(`(?i)^.*\.(?:gradle|mk|mka)$`)
    buildMatch2 = regexp.MustCompile(`^.*/(?:Makefile[^/\\]*)$`)
    regMatch    = regexp.MustCompile(`(?i)(?:test|debug)`)
)

func init() {
    fileLists = make(map[string][]string)
}

func main() {
    flag.StringVar(&topDir, "d", ".", "The top level directory to process (default is current directory)")
    flag.BoolVar(&cProf, "c", false, "Include if you want to save the CPU profile")
    flag.BoolVar(&mProf, "m", false, "Include if you want to save the MEM profile")
    flag.Parse()

    cProfFunc()

    getFilesChan := make(chan string, 1000)
    grepFilesChan := make(chan string, 100)

    go getFileNamesOverChan(topDir, getFilesChan)

    var fileResult string

    var grepWg sync.WaitGroup
    var categorizeWg sync.WaitGroup

    fileTypes = append(fileTypes, "C", "Java", "Go", "Build", "Uncategorized")

    categorizeWg.Add(1)
    go func(chan string) {
        var grepResult string
        for grepResult = range grepFilesChan {
            if grepResult != "" {
                fmt.Printf("Found file %s with text\n", grepResult)
                var fileType = getFileCategory(grepResult)
                fileLists[fileType] = append(fileLists[fileType], grepResult)
            }
        }
        categorizeWg.Done()
    }(grepFilesChan)

    for fileResult = range getFilesChan {
        if fileResult != "" {
            fileNames = append(fileNames, fileResult)
            grepWg.Add(1)
            go func(file string, ch chan string) {
                fmt.Printf("Grepping file %s\n", file)
                grepOverChan(file, ch)
                grepWg.Done()
            }(fileResult, grepFilesChan)
        }

    }

    grepWg.Wait()
    close(grepFilesChan)
    categorizeWg.Wait()

    printSummary()
    mProfFunc()

    defer pprof.StopCPUProfile()
    defer cProfFile.Close()
}

func cProfFunc() {
    if cProf {
        cProfFile, _ = os.Create("cpu_profile.pprof")
        //handle err
        _ = pprof.StartCPUProfile(cProfFile)
        //handle err
    }

}

func mProfFunc() {
    if mProf {
        mProfFile, _ = os.Create("mem_profile.pprof")
        //handle err
        _ = pprof.WriteHeapProfile(mProfFile)
        //handle err
        defer mProfFile.Close()
    }
}

func printSummary() {
    fmt.Printf("\n\nProcessed %d Files\n\n", len(fileNames))

    fmt.Println("")
    fmt.Println("Found text in the following files:")

    for _, fType := range fileTypes {
        fmt.Printf("Found text in %d %s Files\n", len(fileLists[fType]), fType)
    }
    /*
        for _, fType := range fileTypes {
            if len(fileLists[fType]) > 0 {
                fmt.Println("")
                fmt.Printf("\t%s Files:\n", fType)
            }

            for _, fileName := range fileLists[fType] {
                fmt.Printf("\t\t%s\n", fileName)
            }
        }
    */
}

func getFileNamesOverChan(directory string, ch chan string) {
    fmt.Printf("Finding files in directory %s\n", directory)
    var err error
    var dirInfo os.FileInfo

    dirInfo, err = os.Lstat(directory)
    if err != nil {
        close(ch)
        return
    }

    if !dirInfo.IsDir() {
        close(ch)
        return
    }

    recursiveGetFilesOverChan(directory, ch)

    close(ch)
}

func recursiveGetFilesOverChan(dir string, ch chan string) {
    dirFile, _ := os.Open(dir)
    //handle err
    defer dirFile.Close()

    dirFileInfo, _ := dirFile.Readdir(0)
    //handle err

    for _, file := range dirFileInfo {
        filePath := fmt.Sprintf("%s%c%s", dir, os.PathSeparator, file.Name())
        switch mode := file.Mode(); {
        case mode.IsDir():
            //is a directory ... recurse
            recursiveGetFilesOverChan(filePath, ch)
        case mode.IsRegular():
            //is a regular file ... send it if it is not a CVS or GIT file
            if !strings.Contains(filePath, "/CVS/") && !strings.Contains(filePath, "/.git/") {
                fmt.Printf("Found File %s\n", filePath)
                ch <- filePath
            }
        case mode&os.ModeSymlink != 0:
            //is a symbolic link ... skip it
            continue
        case mode&os.ModeNamedPipe != 0:
            //is a Named Pipe ... skip it
            continue
        }
    }
}

func getFileCategory(file string) string {
    var fileType string

    switch {
    case cMatch.MatchString(file):
        fileType = "C"
    case javaMatch.MatchString(file):
        fileType = "Java"
    case goMatch.MatchString(file):
        fileType = "Go"
    case buildMatch.MatchString(file):
        fileType = "Build"
    case buildMatch2.MatchString(file):
        fileType = "Build"
    default:
        fileType = "Uncategorized"
    }
    return fileType
}

func grepOverChan(f string, ch chan string) {
    fileBytes, _ := ioutil.ReadFile(f)
    if regMatch.Match(fileBytes) {
        ch <- f
    }
}

1 Ответ

0 голосов
/ 07 мая 2018

Основываясь на комментариях @JimB к моему вопросу, я смог понять, что это не утечка памяти, а проблема неограниченного параллелизма. Мой оригинальный код запускал grep для каждого файла, поскольку он встречался без ограничений.

Мне удалось решить эту проблему, ограничив количество файлов, открытых в grep одновременно. Используя пример, предоставленный http://jmoiron.net/blog/limiting-concurrency-in-go/. В этой ссылке они создают semaphoreChannel, который будет принимать только ограниченное количество сообщений. Записать значение в этот канал перед открытием файла и прочитать значение из этого канала после завершения поиска файла. В конце дождитесь, пока канал semaphoreChannel снова заполнится.

Вот рабочий код, который соответствует моему неработающему исходному коду (см. grepConcurrencyLimit и semaphoreChan для соответствующих частей):

package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "regexp"
    "runtime/pprof"
    "strings"
    "sync"
)

var (
    topDir               string
    cProf                bool
    mProf                bool
    cProfFile            *os.File
    mProfFile            *os.File
    fileNames            []string
    fileTypes            []string
    fileLists            map[string][]string
    grepConcurrencyLimit int
    cMatch               = regexp.MustCompile(`(?i)^.*\.(?:c|h|cc|cpp|c\+\+|hpp)$`)
    javaMatch            = regexp.MustCompile(`(?i)^.*\.(?:java|js)$`)
    goMatch              = regexp.MustCompile(`(?i)^.*\.(?:go)$`)
    buildMatch           = regexp.MustCompile(`(?i)^.*\.(?:gradle|mk|mka)$`)
    buildMatch2          = regexp.MustCompile(`^.*/(?:Makefile[^/\\]*)$`)
    regMatch             = regexp.MustCompile(`(?i)(?:test|debug)`)
)

func init() {
    fileLists = make(map[string][]string)
}

func main() {
    flag.StringVar(&topDir, "d", ".", "The top level directory to process (default is current directory)")
    flag.IntVar(&grepConcurrencyLimit, "l", 50, "The limit of number of files to grep at any one time")
    flag.BoolVar(&cProf, "c", false, "Include if you want to save the CPU profile")
    flag.BoolVar(&mProf, "m", false, "Include if you want to save the MEM profile")
    flag.Parse()

    cProfFunc()

    getFilesChan := make(chan string, 1000)
    grepFilesChan := make(chan string, 100)

    // This channel is to ensure that only grepConcurrencyLimit files are ever grepped at any one time
    semaphoreChan := make(chan bool, grepConcurrencyLimit)

    go getFileNamesOverChan(topDir, getFilesChan)

    var fileResult string

    var grepWg sync.WaitGroup
    var categorizeWg sync.WaitGroup

    fileTypes = append(fileTypes, "C", "Java", "Go", "Build", "Uncategorized")

    categorizeWg.Add(1)
    go func(chan string) {
        var grepResult string
        for grepResult = range grepFilesChan {
            if grepResult != "" {
                fmt.Printf("Found file %s with text\n", grepResult)
                var fileType = getFileCategory(grepResult)
                fileLists[fileType] = append(fileLists[fileType], grepResult)
            }
        }
        categorizeWg.Done()
    }(grepFilesChan)

    for fileResult = range getFilesChan {
        if fileResult != "" {
            fileNames = append(fileNames, fileResult)
            grepWg.Add(1)
            // write a boolean to semaphoreChan to take up one of the concurrency limit spots
            semaphoreChan <- true
            go func(file string, ch chan string) {
                fmt.Printf("Grepping file %s\n", file)
                //run the function to read a boolean from semaphoreChan to release one of the concurrency limit spots
                defer func() { <-semaphoreChan }()
                grepOverChan(file, ch)
                grepWg.Done()
            }(fileResult, grepFilesChan)
        }

    }

    // refill semaphoreChan to capacity to wait until all of the final go routines have completed.
    for i := 0; i < cap(semaphoreChan); i++ {
        semaphoreChan <- true
    }

    grepWg.Wait()
    close(grepFilesChan)
    categorizeWg.Wait()

    printSummary()
    mProfFunc()

    defer pprof.StopCPUProfile()
    defer cProfFile.Close()
}

func cProfFunc() {
    if cProf {
        cProfFile, _ = os.Create("cpu_profile.pprof")
        //handle err
        _ = pprof.StartCPUProfile(cProfFile)
        //handle err
    }

}

func mProfFunc() {
    if mProf {
        mProfFile, _ = os.Create("mem_profile.pprof")
        //handle err
        _ = pprof.WriteHeapProfile(mProfFile)
        //handle err
        defer mProfFile.Close()
    }
}

func printSummary() {
    fmt.Printf("\n\nProcessed %d Files\n\n", len(fileNames))

    fmt.Println("")
    fmt.Println("Found text in the following files:")

    for _, fType := range fileTypes {
        fmt.Printf("Found text in %d %s Files\n", len(fileLists[fType]), fType)
    }
    /*
        for _, fType := range fileTypes {
            if len(fileLists[fType]) > 0 {
                fmt.Println("")
                fmt.Printf("\t%s Files:\n", fType)
            }

            for _, fileName := range fileLists[fType] {
                fmt.Printf("\t\t%s\n", fileName)
            }
        }
    */
}

func getFileNamesOverChan(directory string, ch chan string) {
    fmt.Printf("Finding files in directory %s\n", directory)
    var err error
    var dirInfo os.FileInfo

    dirInfo, err = os.Lstat(directory)
    if err != nil {
        close(ch)
        return
    }

    if !dirInfo.IsDir() {
        close(ch)
        return
    }

    recursiveGetFilesOverChan(directory, ch)

    close(ch)
}

func recursiveGetFilesOverChan(dir string, ch chan string) {
    dirFile, _ := os.Open(dir)
    //handle err
    defer dirFile.Close()

    dirFileInfo, _ := dirFile.Readdir(0)
    //handle err

    for _, file := range dirFileInfo {
        filePath := fmt.Sprintf("%s%c%s", dir, os.PathSeparator, file.Name())
        switch mode := file.Mode(); {
        case mode.IsDir():
            //is a directory ... recurse
            recursiveGetFilesOverChan(filePath, ch)
        case mode.IsRegular():
            //is a regular file ... send it if it is not a CVS or GIT file
            if !strings.Contains(filePath, "/CVS/") && !strings.Contains(filePath, "/.git/") {
                fmt.Printf("Found File %s\n", filePath)
                ch <- filePath
            }
        case mode&os.ModeSymlink != 0:
            //is a symbolic link ... skip it
            continue
        case mode&os.ModeNamedPipe != 0:
            //is a Named Pipe ... skip it
            continue
        }
    }
}

func getFileCategory(file string) string {
    var fileType string

    switch {
    case cMatch.MatchString(file):
        fileType = "C"
    case javaMatch.MatchString(file):
        fileType = "Java"
    case goMatch.MatchString(file):
        fileType = "Go"
    case buildMatch.MatchString(file):
        fileType = "Build"
    case buildMatch2.MatchString(file):
        fileType = "Build"
    default:
        fileType = "Uncategorized"
    }
    return fileType
}

func grepOverChan(f string, ch chan string) {
    fileBytes, _ := ioutil.ReadFile(f)
    if regMatch.Match(fileBytes) {
        ch <- f
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...