Поэтому я хочу запустить отдельную процедуру для каждого репозитория github, чтобы получить все его PR и отправить их на соответствующие каналы и объединить все эти каналы, чтобы использовать все PR разных репозиториев из одного канала.
Это фрагмент кода для этого.
func (c *Client) FetchAllPRs() {
c.GetRepos()
c.GetGitData()
c.ghCollector.SetSHAMap(c.shaMap)
start := time.Now()
logging.Debug("TOTAL REPOS ", len(c.repos))
var channels []<-chan *sources.PRDetails
for _, repo := range c.repos {
channels = append(channels, c.ghCollector.GetPRNumbers(c.context, repo.Details))
}
sink := mergePRChannels(channels)
count := 0
for _ = range sink {
count += 1
}
elapsed := time.Since(start)
logging.Info(count, " took time: ", elapsed)
}
func mergePRChannels(outputsChan []<-chan *sources.PRDetails) <-chan *sources.PRDetails {
var wg sync.WaitGroup
// make return channel
merged := make(chan *sources.PRDetails)
wg.Add(len(outputsChan))
output := func(sc <-chan *sources.PRDetails) {
for sqr := range sc {
merged <- sqr
}
// once channel (square numbers sender) closes,
// call `Done` on `WaitGroup` to decrement counter
wg.Done()
}
for _, optChan := range outputsChan {
go output(optChan)
}
// run goroutine to close merged channel once done
go func() {
// wait until WaitGroup finishes
wg.Wait()
close(merged)
}()
return merged
}
Теперь ответы от Github разбиты на страницы, и процедура извлечения PR запускается внутри него oop.
type PRDetails struct {
RepoName string
PR githubv4.Int
}
// GetPRNumbers returns a channel of PR details
func (ghc *GithubCollector) GetPRNumbers(ctx context.Context, repo *RepoDetails) <-chan *PRDetails {
// these two are just struct types representing the graphql request
var fbreq firstBatchRequest
var sbreq subsequentBatchRequest
var hasNextPage githubv4.Boolean
var cursor githubv4.String
out := make(chan *PRDetails)
go func(out chan *PRDetails, repoName string, fbreq firstBatchRequest, sbreq subsequentBatchRequest, hastNextPage githubv4.Boolean, cursor githubv4.String) {
// graphql options
opts := map[string]interface{}{
}
err := ghc.graphQLClient.Query(ctx, &fbreq, opts)
if err != nil {
close(out)
return
}
if len(fbreq.Repository.PullRequests.Nodes) == 0 {
close(out)
return
}
for _, pr := range fbreq.Repository.PullRequests.Nodes {
out <- &PRDetails{
RepoName: repo.Name,
PR: pr.Number,
}
}
hasNextPage = fbreq.Repository.PullRequests.PageInfo.HasNextPage
cursor = fbreq.Repository.PullRequests.PageInfo.EndCursor
// no more PRs after the first batch, return
if hastNextPage == githubv4.Boolean(false) {
logging.Debug("no next page", repoName)
return
}
// loop till all the PR numbers have been collected
for {
// graphql options
opts := map[string]interface{}{
}
// doesn't appear to be a valid cursor ``
err := ghc.graphQLClient.Query(ctx, &sbreq, opts)
if err != nil {
break
}
for _, pr := range sbreq.Repository.PullRequests.Nodes {
out <- &PRDetails{
RepoName: repo.Name,
PR: pr.Number,
}
}
cursor = sbreq.Repository.PullRequests.PageInfo.EndCursor
if sbreq.Repository.PullRequests.PageInfo.HasNextPage == githubv4.Boolean(false) {
break
}
}
}(out, repoName, fbreq, sbreq, hasNextPage, cursor)
return out
}
Итак происходит то, что он работает нормально для первого запроса, то есть он получает первые 100 PR-номеров для всех репо, но никогда не вводит для l oop, чтобы получить следующий пакет, используя EndCursor, возвращаемый API Github .
Таким образом, я распечатал вывод только для одного репо, используя его имя в операторе if, чтобы проверить, не было ли для значения hasNextPage
установлено значение false, что может стать причиной того, что он не введет для l oop но, к моему удивлению, журнал распечатал истину, и он также вошел в это утверждение if и вернул
if hastNextPage == githubv4.Boolean(false) {
// our channel already has values in it so we use a naked return without close
logging.Debug("no next page", repoName)
return
}
он не должен был этого делать Теперь я думаю, что все аргументы, которые я передаю своему закрытию goroutine, являются копиями или ссылками на то, что некоторые другие goroutine могут менять true на false, что не имеет никакого смысла. Поскольку этот c.ghCollector.GetPRNumbers
выполняется в al oop, я предположил, что каждый раз, когда эта функция вызывается, все переменные, которые объявлены вне замыкания, будут создаваться для каждой функции отдельно, и мне не понадобятся какие-либо мьютексы для чтения или записи. , Я очень сбит с толку, ничего не имеет смысла.
Что я здесь не так делаю?
Также я не уверен, следует ли мне передавать Чана на закрытие или нет, это, вероятно, будет без него нормально работают но у меня мозг зажарен
Вот краткий пример проблемы
func FetchAllPRs() {
repos := []{"repoa", "repob"}
for _, repo := range repos {
channels = append(channels, GetPRNumbers(repo))
}
sink := mergePRChannels(channels)
count := 0
for _ = range sink {
count += 1
}
fmt.Println(count)
}
func mergePRChannels(outputsChan []<-chan *PRDetails) <-chan *PRDetails {
var wg sync.WaitGroup
merged := make(chan *PRDetails)
wg.Add(len(outputsChan))
output := func(sc <-chan *PRDetails) {
for sqr := range sc {
merged <- sqr
}
wg.Done()
}
for _, optChan := range outputsChan {
go output(optChan)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
type Response struct {
Cursor string
HasNextPage bool
}
func mockGithubResponse() {
return &Response{
Cursor: "fdsfdsfdsfsdfsfsdwrWfsdfs=",
HasNextPage: true,
}
}
type PRDetails struct {
RepoName string
PR githubv4.Int
}
// GetPRNumbers returns a channel of PR details
func GetPRNumbers(repoName string) <-chan *PRDetails {
var hasNextPage bool
var cursor string
var resp *Response
out := make(chan *PRDetails)
go func(repoName string, resp *Response, hastNextPage bool, cursor string) {
// execute the graphql query and unmarshall the result in struct
// for now using a mock reponse
resp := mockGithubResponse()
hasNextPage = resp.HasNextPage
cursor = resp.Cursor
// no more PRs after the first batch, return
if hastNextPage == false {
fmt.Println("no next page", repoName)
return
}
// loop till all the PR numbers have been collected
for {
fmt.Println("if hasNextPage then enter the for loop")
// get more PRs
}
close(out)
}(repoName, resp, hasNextPage, cursor)
return out
}