Я новичок в go каналах и наткнулся на проблему с закрытием каналов. Я пытаюсь собрать тысячи записей API и отправить их * kafka topi c. В процессе я нажимаю API1, чтобы собрать все данные, а затем делю эти данные на 2 группы. Одна группа отправляется и получает больше данных от API2, а другая группа напрямую отправляется в Кафку. Я использую программы и каналы, чтобы минимизировать время, необходимое для выполнения sh вызова API2. Но, похоже, я что-то здесь упускаю или неправильно закрываю каналы, которые вызывают мой процессор. Я запускаю это на процессоре 1500 milicore каждые 12 часов, используя тикер. Первый запуск занял 30 секунд и 5% процессорного времени, но последний запуск занял 400 секунд и 60% процессорного времени. Это касается меня, что я мог оставить каналы висящими или не закрытыми должным образом? Любая помощь будет оценена.
func main {
ticker.C triggers runschedule
func runschedule () {
run ()
}
func run () {
API1 = url
ch := make(chan string)
for _, apiObject := api1data {
// split into 2 groups using if loop
if group1 {
construct each url using a specific name from API1 data
go fetch(api2, ch, object, string(name))
if <-ch != "" {
fmt.Printf(<-ch) //print any error from api2
}
} else { // group2
//directly to kafka
......
push(km)
}
}
func fetch (url string, ch chan<- string, Object (struct type) , name string) {
resp, err := http.Get(url)
if err != nil {
ch <- fmt.Sprint(err)
return
}
defer resp.Body.Close()
responseData, err := ioutil.ReadAll(resp.Body)
if err != nil {
ch <- fmt.Sprintf("While reading %s: %v", url, err)
return
}
err = json.Unmarshal(responseData, &api2Data)
if err != nil {
ch <- fmt.Sprint(err)
return
}
for _, api2Object := range api2Data.Records {
// construct kafka messages
.....
km = append(km, newKafkaRecord)
}
push(km)
close(ch)
}
func push() {
//push KM to kafka topic
}
}
}