Я работаю над механизмом рекомендаций с Apache Prediction IO. Перед сервером событий у меня есть GO api, который прослушивает события от клиента и импортера. В конкретном случае, когда клиент использует импортера, я собираю импортированные идентификаторы и отправляю json от api импортера на GO api. Например, если пользователь импортирует CSV, содержащий 45000 данных, я отправляю эти 45000 идентификаторов на GO api в json, например, {"barcodes":[...]}
. Сервер событий прогнозирования ввода-вывода хочет данные в определенной форме.
type ItemEvent struct {
Event string `json:"event"`
EntityType string `json:"entityType"`
EntityId string `json:"entityId"`
Properties map[string][]string `json:"properties"`
EventTime time.Time `json:"eventTime"`
}
type ItemBulkEvent struct {
Event string `json:"event"`
Barcodes []string `json:"barcodes"`
EventTime time.Time `json:"eventTime"`
}
ItemEvent
- это окончательные данные, которые я отправлю на сервер событий из GO Api. ItemBulkEvent
- это данные, которые я получаю от импортера api.
func HandleItemBulkEvent(w http.ResponseWriter, r *http.Request) {
var itemBulk model.ItemBulkEvent
err := decode(r,&itemBulk)
if err != nil {
log.Fatalln("handleitembulkevent -> ",err)
util.RespondWithError(w,400,err.Error())
}else {
var item model.ItemEvent
item.EventTime = itemBulk.EventTime; item.EntityType = "item"; item.Event = itemBulk.Event
itemList := make([]model.ItemEvent,0,50)
for index, barcode := range itemBulk.Barcodes{
item.EntityId = barcode
if (index > 0 && (index % 49) == 0){
itemList = append(itemList, item)
go sendBulkItemToEventServer(w,r,itemList)
itemList = itemList[:0]
}else if index == len(itemBulk.Barcodes) - 1{
itemList = append(itemList, item)
itemList = itemList[:( (len(itemBulk.Barcodes) - 1) % 49)]
go sendBulkItemToEventServer(w,r,itemList) // line 116
itemList = itemList[:0]
} else{
itemList = append(itemList, item)
}
}
util.RespondWithJSON(w,200,"OK")
}
}
HandleItemBulkEvent
- функция-обработчик для массовых обновлений. На этом этапе я должен упомянуть о пакетных загрузках io. Через rest api prediction сервер событий io принимает 50 событий на запрос. Итак, я создал список с 50 крышками и предметом. Я использовал один и тот же элемент, но на каждом шагу менял идентификационную часть (штрих-код) и добавлял в список. В каждом 50. элементе я использовал функцию обработчика, которая отправляет этот список на сервер событий и после этого очищает список и т.д. Сервер событий io. В этой части, когда я пытаюсь использовать 5000 + - элемент, он работает хорошо, но когда я пытаюсь использовать 45000 элементов, приложение вылетает с ошибкой ниже.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xc05938]
goroutine 620 [running]:
api-test/service.sendBulkItemToEventServer(0x1187860, 0xc00028e0e0, 0xc00029c200, 0xc00011c000, 0x31, 0x32)
/home/kadirakinkorkunc/Desktop/playground/recommendation-engine/pio_api/service/CollectorService.go:141 +0x468
created by api-test/service.HandleItemBulkEvent
/home/kadirakinkorkunc/Desktop/playground/recommendation-engine/pio_api/service/CollectorService.go:116 +0x681
Debugger finished with exit code 0
Есть идеи, как я могу решить эту проблему?
изменить : как Бурак Сердар упомянул в ответах, я исправил путаницу err, err2 и проблему гонки данных, используя маршаллинг перед отправкой. Теперь это дает мне настоящую ошибку (res, err2), я думаю.
2020/08/03 15:11:55 err http -> Post "http://localhost:7070/batch/events.json?accessKey=FJbGODbGzxD-CoLTdwTN2vwsuEEBJEZc4efrSUc6ekV1qUYAWPu5anDTyMGDoNq1": read tcp 127.0.0.1:54476->127.0.0.1:7070: read: connection reset by peer
Есть идеи по этому поводу?