У меня есть огромное количество данных, которые необходимо загрузить в бухгалтерскую книгу, для которой я использую пакетную обработку и делю входные данные на несколько частей и применяю параллелизм к каждой части, для которой я запускаю две go routines.One предназначен для проверки наличия дублированных данных, а другой - для загрузки их в бухгалтерскую книгу. Только если данные еще не присутствуют в бухгалтерской книге, происходит загрузка. Но когда я делаю проверку дубликатов для 95 из 99 данных, я ' Я получаю эту же ошибку. Только первые данные каждого пакета проходят и загружаются. Так, только 4 элемента загружаются. Все остальные сбои с одинаковой ошибкой
Ошибка
[7373995b] error sending GET_STATE_BY_RANGE
Код
const maxBatchSize int = 25
skip := 0
filesAmount := len(data)
batchAmount := int(math.Ceil(float64(filesAmount / maxBatchSize)))
for i := 0; i <= batchAmount; i++ {
lowerBound := skip
upperBound := skip + maxBatchSize
if upperBound > filesAmount {
upperBound = filesAmount
}
batchItems := data[lowerBound:upperBound]
skip += maxBatchSize
lineItemChan := make(chan map[string]interface{})
var itemProcessingGroup sync.WaitGroup
itemProcessingGroup.Add(len(batchItems))
for idx := range batchItems {
go func(item *LineItem, idx int) {
if err := item.CheckForDuplicateData(stub, args[2]); err != nil {
defer itemProcessingGroup.Done()
fmt.Println("got error for %s internal doc no due to %s", item.InternalDocNo, err.Error())
errors = append(errors, ErrorItem{Reason: err.Error(), LineNO: (idx + 1)})
} else {
Map := make(map[string]interface{})
fmt.Println("No empty data found for %s", item)
Map["data"] = item
Map["index"] = idx
fmt.Println("sending to upload %s", Map)
lineItemChan <- Map
}
}(&batchItems[idx], idx)
go func() {
fmt.Println("waiting")
Map, ok := <-lineItemChan
fmt.Println("recieved %s", Map)
if ok {
defer itemProcessingGroup.Done()
var item LineItem
data, err := json.Marshal(Map["data"])
json.Unmarshal(data, &item)
item.UserId = args[0]
item.Type = args[1]
item.Status = "Uploaded"
item.LockId = ""
idx := Map["index"].(int)
AsBytes, _ := json.Marshal(item)
fmt.Println("Uploading %s", item)
IndexKey, err := stub.CreateCompositeKey(indexName, []string{args[2], item.InternalDocNo, txId})
if err != nil {
errors = append(errors, ErrorItem{Reason: err.Error(), LineNO: (idx + 1)})
}
errWhileUpdating := stub.PutState(IndexKey, AsBytes)
fmt.Println(errWhileUpdating)
if errWhileUpdating != nil {
errors = append(errors, ErrorItem{Reason: "Failed to upload", LineNO: (i + 1)})
}
}
}()
}
itemProcessingGroup.Wait()
close(lineItemChan)
Дубликат чека
func (li *LineItem) CheckForDuplicateData(stub shim.ChaincodeStubInterface, No string) error {
Key := "Rno~docno~txid"
var docNo string = li.InternalDocNo
fmt.Println("Checking duplicate for line item %s", li)
resultsIterator, err := stub.GetStateByPartialCompositeKey(Key,[]string{No,docNo})
if err != nil {
fmt.Println("Got errror while checking duplicate due to %s", err.Error())
return fmt.Errorf("Failed to fetch composite keys for RNo: %v, docNo: %v due to %v",
No, docNo, err.Error())
}
fmt.Println("Does this exists ? %s", resultsIterator.HasNext())
if resultsIterator.HasNext() {
errorStr := fmt.Errorf("A line item with No: %v, accountingDocNo: %v, already existed",
No, docNo)
return errorStr
}
return nil
}