Попытка приостановить и возобновить выполнение процедур с использованием каналов, но в конечном итоге они зависают - PullRequest
0 голосов
/ 23 марта 2020

Мне нужно спанировать несколько goroutines, которые делают вызовы API для GitHub, чтобы получать обзоры PR для всех репо организации в пакетном режиме. Например

{RepoA: #1, #2, ..........#20}
{RepoA: #21, #22, ..........#40}
.
.
{RepoZ: #1, #2, ..........#20}

Это мой вызывающий процесс

// get the heartbeats channel
heartbeats := signal.Heartbeats()
// get the run chasennel
run := signal.Run()
rem, _ := c.ghCollector.GetRemainingRequests(ctx)
// fill Signal.run chan with as many empty structs as there are github api requests left
signal.SetRun(rem)

// run all jobs
for _, batch := range batches {
        signal.RegisterTask() // this just increments a wait group counter
        c.ghCollector.GetReviewsWorker(ctx, batch, signal, heartbeats, run)
}

go func() {
            logging.Debug("infinite goroutine")
            for {
                select {
                case <-quit:
                    logging.Debug("all goroutines have exited, quit")
                    return

                case <-heartbeats:
                    logging.Debug("goroutines are running, skip the API Rate limit poll in default case")
                    continue

                default:
                 // get the number of requests remaining
                    if rem, _ := c.ghCollector.GetRemainingRequests(ctx); rem > 0 {
                        // fill the run channel with the remaining number of requests with empty structs
                        signal.SetRun(rem)
                    } else {
                        logging.Debug("none left")
                        // if requests are not available then loop again
                        heartbeats <- struct{}{}
                    }
                }
            }
        }()
       // wait for all the goroutines to be done and decrement the waitgroup counter
       signal.Wait()
       quit <- struct{}{}

Это рабочая функция

GetReviewsWorker(ctx context.Context, prs *types.PRBatch, signal *orchestrator.Signal, heartbeats chan struct{}, run <-chan struct{}) {
    go func() {
        // decrement wg
        defer signal.Done()
        for _, pr := prs.Batch {
           // run only if there are requests left 
           <-run
           // if running then send a heartbeat to indicate the worker is not idle/waiting
           heartbeats <- struct{}{}
           // do some work, make some API calls
        }
    }()
}

Это структура сигнала

type Signal struct {
    run        chan struct{}
    heartbeats chan struct{}
    wg         sync.WaitGroup
}

func NewSignal(length int) *Signal {
    return &Signal{
        heartbeats: make(chan struct{}),
        // because send and receives from a nil channel block forever
        run: make(chan struct{}, length),
    }
}

func (s *Signal) SetRun(runs int) {
    // only send runs as many as the running tasks
    for i := 0; i < runs; i++ {
        logging.Debug("RUN RUN RUN ", i)
        s.run <- struct{}{}
    }
}

// other methods
//

Я запускаю это для организации, которая запускает около 1000 процедур и некоторое время работает нормально, но в итоге зависает и ничего не печатается в журналах.

Что я делаю не так? Мой план состоял в том, чтобы делать вызов API каждый раз, когда у нас есть доступные запросы, заполняя run chan количеством оставшихся запросов. Если в канале что-то есть, выполнение будет продолжено, в противном случае оно будет блокироваться до тех пор, пока канал не будет снова заполнен

В случае продолжения, сигнал отправляется в канал пульса (который существует, чтобы избежать создания запрос на ограничение скорости на Github). Если возникает сердцебиение, мы знаем, что программа выполняет некоторую работу и не блокируется, поэтому мы просто oop выбираем как обычно, не проверяя оставшиеся запросы

Как только все процедуры завершены, мы отправляем выход Сигнал, чтобы остановить процесс опроса Github API для ограничения скорости, и мы сделали. Я думал, что это будет работать нормально, но в моей голове это работало только нормально.

РЕДАКТИРОВАТЬ: добавить еще несколько деталей, касающихся проблемы

Так что я пытаюсь сделать, это получить количество отзывов на каждый PR для организации, использующей API GraphQL. У меня есть количество PR для каждого репо в небольших партиях, скажем, 20, как описано вверху.

Для каждой партии я хочу запустить моего рецензента, который будет получать количество рецензий для этой партии PR звоня по телефону oop. Этот проверяющий работник работает для каждой партии в своей собственной программе

Так как Github имеет ограничение скорости 5000 рэк / час, я хочу иметь возможность запускать все мои программы, но как только я достигну нормы скорости, я Я хочу, чтобы все они приостановили выполнение и подождали, пока у нас не будет больше доступных запросов. Я использую каналы с пустыми структурами, чтобы указать, есть ли у нас доступные запросы или нет, это - запускать канал, когда он заполнен, мы go о нашем бизнесе, когда он пусто блокирует выполняющиеся программы.

Перед запуском рабочих, выполняющих проверку, код вызывает github, чтобы узнать, сколько запросов доступно, исходя из этого числа, мы заполняем канал запуска таким количеством пустых структур, чтобы мы можем сделать как можно больше вызовов API в наших программах и затем блокировать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...