закрытие go каналов и проблема с высокой загрузкой процессора - PullRequest
0 голосов
/ 16 апреля 2020

Я новичок в go каналах и наткнулся на проблему с закрытием каналов. Я пытаюсь собрать тысячи записей API и отправить их * kafka topi c. В процессе я нажимаю API1, чтобы собрать все данные, а затем делю эти данные на 2 группы. Одна группа отправляется и получает больше данных от API2, а другая группа напрямую отправляется в Кафку. Я использую программы и каналы, чтобы минимизировать время, необходимое для выполнения sh вызова API2. Но, похоже, я что-то здесь упускаю или неправильно закрываю каналы, которые вызывают мой процессор. Я запускаю это на процессоре 1500 milicore каждые 12 часов, используя тикер. Первый запуск занял 30 секунд и 5% процессорного времени, но последний запуск занял 400 секунд и 60% процессорного времени. Это касается меня, что я мог оставить каналы висящими или не закрытыми должным образом? Любая помощь будет оценена.

func main {

  ticker.C triggers runschedule 

     func runschedule () {
       run ()
     }

     func run () {
       API1 = url 
       ch := make(chan string)

       for _, apiObject := api1data {

         // split into 2 groups using if loop 
         if group1 {
           construct each url using a specific name from API1 data
           go fetch(api2, ch, object, string(name))
           if <-ch != "" {
             fmt.Printf(<-ch) //print any error from api2 
           }
          } else { // group2
             //directly to kafka
             ......
             push(km)
           }
        }

     func fetch (url string, ch chan<- string, Object (struct type) , name string) {
       resp, err := http.Get(url)
       if err != nil {
         ch <- fmt.Sprint(err)
         return
       }
       defer resp.Body.Close()
       responseData, err := ioutil.ReadAll(resp.Body)
        if err != nil {
          ch <- fmt.Sprintf("While reading %s: %v", url, err)
          return
        }
        err = json.Unmarshal(responseData, &api2Data)
        if err != nil {
          ch <- fmt.Sprint(err)
          return
        }
        for _, api2Object := range api2Data.Records {
          // construct kafka messages
          .....
          km = append(km, newKafkaRecord)
        }
        push(km)
        close(ch)
      }

      func push() {
        //push KM to kafka topic
      }
    }
  }







...