Итак, я пытаюсь запустить ananlysis на Git коммитах. Я загружаю все репозитории github org и для каждого репо я запускаю отдельную подпрограмму для каждого репо для повторения своих коммитов и отправляю все эти коммиты через их соответствующие каналы вызывающему методу, где все эти коммит-каналы объединяются и все коммиты Структуры отправляются на один канал, с которого они впоследствии используются и записываются в Elasticsearch.
Так выглядит мой код
func (c *Client) GetGitData(ctx context.Context) {
start := time.Now()
repos := c.ghCollector.GetRepos(ctx)
var channels []<-chan *sources.CommitDetails
for _, repo := range repos {
channels = append(channels, c.gitCollector.GetCommits(ctx, repo))
}
sink := mergeCommitChannels(channels)
for doc := range sink {
if doc.Err != nil {
log.WithFields(logrus.Fields{
"type": "client",
}).Error(doc.Err)
continue
}
c.esSink.Write(ctx, "commits", doc)
}
elapsed := time.Since(start)
log.Info("took time: ", elapsed)
}
func mergeCommitChannels(outputsChan []<-chan *sources.CommitDetails) <-chan *sources.CommitDetails {
var wg sync.WaitGroup
merged := make(chan *sources.CommitDetails)
wg.Add(len(outputsChan))
output := func(commitChan <-chan *sources.CommitDetails) {
for commit := range commitChan {
merged <- commit
}
wg.Done()
}
for _, optChan := range outputsChan {
go output(optChan)
}
// run goroutine to close merged channel once done
go func() {
// wait until WaitGroup finishes
wg.Wait()
close(merged)
}()
return merged
}
Именно таков метод итерации фиксации, находящийся в другой пакет выглядит как
func (gc *GitCollector) GetCommits(ctx context.Context, repo RepoDetails) <-chan *CommitDetails {
// clones the repo if not locally present otherwise opens it
repository := gc.CloneRepo(ctx, repo)
commits, err := repository.CommitObjects()
if err != nil {
log.WithFields(logrus.Fields{
"type": "sources-git",
"repo": repo.Name,
}).Error(err)
}
out := make(chan *CommitDetails)
go func() {
defer func() {
// recover from panic if one occured. Set err to nil otherwise.
if r := recover(); r != nil {
log.Info("<===X=X=X=X=X=X=X=X=X=X=X=X=X=X=X=X=X=X=X===> ", r, repo.Name)
log.Fatal(r, repo.Name, commits)
}
}()
//defer commits.Close()
commits.ForEach(func(commit *object.Commit) error {
commitObject := &CommitDetails{
Hash: commit.Hash.String(),
Author: commit.Author.Email,
Repo: repo.Name,
}
log.WithFields(logrus.Fields{
"type": "sources-git",
"obj": commitObject.Author,
"repo": repo,
}).Debug("Commit object")
out <- commitObject
return nil
})
close(out)
}()
return out
}
Так что, запустив этот код для более чем 150 репозиториев, я мог видеть объекты фиксации в журнале, но через некоторое время все блокировалось, и я не вижу никаких новых выходных данных, поэтому после подождав несколько минут, я разочаровался и запустил это не для списка репо, а только для одного репо (самое большое с 40K + коммитов)
Когда я запускаю его только для одного репо (и как в любое другое время) через некоторое время я перестаю видеть какой-либо новый вывод), через некоторое время мой ноутбук запускается зависает, и вдруг мой ноутбук блокируется, и когда я его разблокирую, я вижу, что все процессы, которые выполнялись до блокировки, были остановлены, как мой браузер, оболочка, go программа, которая заморозила мой ноутбук.
Однако, когда я запускаю это для этого репо, но вместо того, чтобы отправлять коммиты в elasitcsearch, я записываю их в файл json, он завершает процесс без остановки моего ноутбука примерно за 15 минут
Я не совсем уверен в чем проблема блокирует горутин и через некоторое время начинает кушать память? (поскольку у меня был открыт htop вместе с журналами моей программы, я начал использовать около 3/16 ГБ оперативной памяти, а затем через несколько минут он увеличился до 4 ГБ, то есть, когда мой ноутбук начал зависать, а затем через несколько секунд это было заперся и пошел спать)
Кто-нибудь еще сталкивался с этим?
Есть ли в любом случае, я могу выяснить, какая из подпрограмм может вызывать эту проблему, когда я запускаю несколько программ, а также, когда я запускаю его только для одного репозитория, есть ли способ узнать, действительно ли программа выполняет какие-либо действия? или остановился / заблокирован?