Мне нужно спанировать несколько goroutines, которые делают вызовы API для GitHub, чтобы получать обзоры PR для всех репо организации в пакетном режиме. Например
{RepoA: #1, #2, ..........#20}
{RepoA: #21, #22, ..........#40}
.
.
{RepoZ: #1, #2, ..........#20}
Это мой вызывающий процесс
// get the heartbeats channel
heartbeats := signal.Heartbeats()
// get the run chasennel
run := signal.Run()
rem, _ := c.ghCollector.GetRemainingRequests(ctx)
// fill Signal.run chan with as many empty structs as there are github api requests left
signal.SetRun(rem)
// run all jobs
for _, batch := range batches {
signal.RegisterTask() // this just increments a wait group counter
c.ghCollector.GetReviewsWorker(ctx, batch, signal, heartbeats, run)
}
go func() {
logging.Debug("infinite goroutine")
for {
select {
case <-quit:
logging.Debug("all goroutines have exited, quit")
return
case <-heartbeats:
logging.Debug("goroutines are running, skip the API Rate limit poll in default case")
continue
default:
// get the number of requests remaining
if rem, _ := c.ghCollector.GetRemainingRequests(ctx); rem > 0 {
// fill the run channel with the remaining number of requests with empty structs
signal.SetRun(rem)
} else {
logging.Debug("none left")
// if requests are not available then loop again
heartbeats <- struct{}{}
}
}
}
}()
// wait for all the goroutines to be done and decrement the waitgroup counter
signal.Wait()
quit <- struct{}{}
Это рабочая функция
GetReviewsWorker(ctx context.Context, prs *types.PRBatch, signal *orchestrator.Signal, heartbeats chan struct{}, run <-chan struct{}) {
go func() {
// decrement wg
defer signal.Done()
for _, pr := prs.Batch {
// run only if there are requests left
<-run
// if running then send a heartbeat to indicate the worker is not idle/waiting
heartbeats <- struct{}{}
// do some work, make some API calls
}
}()
}
Это структура сигнала
type Signal struct {
run chan struct{}
heartbeats chan struct{}
wg sync.WaitGroup
}
func NewSignal(length int) *Signal {
return &Signal{
heartbeats: make(chan struct{}),
// because send and receives from a nil channel block forever
run: make(chan struct{}, length),
}
}
func (s *Signal) SetRun(runs int) {
// only send runs as many as the running tasks
for i := 0; i < runs; i++ {
logging.Debug("RUN RUN RUN ", i)
s.run <- struct{}{}
}
}
// other methods
//
Я запускаю это для организации, которая запускает около 1000 процедур и некоторое время работает нормально, но в итоге зависает и ничего не печатается в журналах.
Что я делаю не так? Мой план состоял в том, чтобы делать вызов API каждый раз, когда у нас есть доступные запросы, заполняя run chan количеством оставшихся запросов. Если в канале что-то есть, выполнение будет продолжено, в противном случае оно будет блокироваться до тех пор, пока канал не будет снова заполнен
В случае продолжения, сигнал отправляется в канал пульса (который существует, чтобы избежать создания запрос на ограничение скорости на Github). Если возникает сердцебиение, мы знаем, что программа выполняет некоторую работу и не блокируется, поэтому мы просто oop выбираем как обычно, не проверяя оставшиеся запросы
Как только все процедуры завершены, мы отправляем выход Сигнал, чтобы остановить процесс опроса Github API для ограничения скорости, и мы сделали. Я думал, что это будет работать нормально, но в моей голове это работало только нормально.
РЕДАКТИРОВАТЬ: добавить еще несколько деталей, касающихся проблемы
Так что я пытаюсь сделать, это получить количество отзывов на каждый PR для организации, использующей API GraphQL. У меня есть количество PR для каждого репо в небольших партиях, скажем, 20, как описано вверху.
Для каждой партии я хочу запустить моего рецензента, который будет получать количество рецензий для этой партии PR звоня по телефону oop. Этот проверяющий работник работает для каждой партии в своей собственной программе
Так как Github имеет ограничение скорости 5000 рэк / час, я хочу иметь возможность запускать все мои программы, но как только я достигну нормы скорости, я Я хочу, чтобы все они приостановили выполнение и подождали, пока у нас не будет больше доступных запросов. Я использую каналы с пустыми структурами, чтобы указать, есть ли у нас доступные запросы или нет, это - запускать канал, когда он заполнен, мы go о нашем бизнесе, когда он пусто блокирует выполняющиеся программы.
Перед запуском рабочих, выполняющих проверку, код вызывает github, чтобы узнать, сколько запросов доступно, исходя из этого числа, мы заполняем канал запуска таким количеством пустых структур, чтобы мы можем сделать как можно больше вызовов API в наших программах и затем блокировать.