Получение [7373995b] ошибки при отправке GET_STATE_BY_RANGE для GetStateByPartialCompositeKey с использованием подпрограмм go - PullRequest
0 голосов
/ 13 апреля 2020

У меня есть огромное количество данных, которые необходимо загрузить в бухгалтерскую книгу, для которой я использую пакетную обработку и делю входные данные на несколько частей и применяю параллелизм к каждой части, для которой я запускаю две go routines.One предназначен для проверки наличия дублированных данных, а другой - для загрузки их в бухгалтерскую книгу. Только если данные еще не присутствуют в бухгалтерской книге, происходит загрузка. Но когда я делаю проверку дубликатов для 95 из 99 данных, я ' Я получаю эту же ошибку. Только первые данные каждого пакета проходят и загружаются. Так, только 4 элемента загружаются. Все остальные сбои с одинаковой ошибкой

Ошибка

[7373995b] error sending GET_STATE_BY_RANGE

Код

    const maxBatchSize int = 25
    skip := 0
    filesAmount := len(data)
    batchAmount := int(math.Ceil(float64(filesAmount / maxBatchSize)))

    for i := 0; i <= batchAmount; i++ {

        lowerBound := skip
        upperBound := skip + maxBatchSize
        if upperBound > filesAmount {
            upperBound = filesAmount
        }
        batchItems := data[lowerBound:upperBound]
        skip += maxBatchSize

        lineItemChan := make(chan map[string]interface{})


        var itemProcessingGroup sync.WaitGroup
        itemProcessingGroup.Add(len(batchItems))

        for idx := range batchItems {

            go func(item *LineItem, idx int) {

                if err := item.CheckForDuplicateData(stub, args[2]); err != nil {
                    defer itemProcessingGroup.Done()

                    fmt.Println("got error for %s internal doc no due to %s", item.InternalDocNo, err.Error())
                    errors = append(errors, ErrorItem{Reason: err.Error(), LineNO: (idx + 1)})
                } else {

                    Map := make(map[string]interface{})
                    fmt.Println("No empty data found for %s", item)
                    Map["data"] = item
                    Map["index"] = idx
                    fmt.Println("sending to upload %s", Map)

                    lineItemChan <- Map
                }

            }(&batchItems[idx], idx)

            go func() {

                fmt.Println("waiting")
                Map, ok := <-lineItemChan
                fmt.Println("recieved %s", Map)

                if ok {
                    defer itemProcessingGroup.Done()
                    var item LineItem
                    data, err := json.Marshal(Map["data"])
                    json.Unmarshal(data, &item)
                    item.UserId = args[0]
                    item.Type = args[1]
                    item.Status = "Uploaded"
                    item.LockId = ""
                    idx := Map["index"].(int)
                    AsBytes, _ := json.Marshal(item)
                    fmt.Println("Uploading %s", item)

                    IndexKey, err := stub.CreateCompositeKey(indexName, []string{args[2], item.InternalDocNo, txId})
                    if err != nil {
                        errors = append(errors, ErrorItem{Reason: err.Error(), LineNO: (idx + 1)})
                    }

                    errWhileUpdating := stub.PutState(IndexKey, AsBytes)
                    fmt.Println(errWhileUpdating)
                    if errWhileUpdating != nil {
                        errors = append(errors, ErrorItem{Reason: "Failed to upload", LineNO: (i + 1)})
                    }
                }
            }()
        }
        itemProcessingGroup.Wait()
        close(lineItemChan)

Дубликат чека

func (li *LineItem) CheckForDuplicateData(stub shim.ChaincodeStubInterface, No string) error {
    Key := "Rno~docno~txid"

    var docNo string = li.InternalDocNo
    fmt.Println("Checking duplicate for line item %s", li)
    resultsIterator, err := stub.GetStateByPartialCompositeKey(Key,[]string{No,docNo})
    if err != nil {
        fmt.Println("Got errror while checking duplicate due to %s", err.Error())
        return fmt.Errorf("Failed to fetch composite keys for RNo: %v, docNo: %v due to %v",
            No, docNo, err.Error())
    }
    fmt.Println("Does this exists ? %s", resultsIterator.HasNext())
    if resultsIterator.HasNext() {
        errorStr := fmt.Errorf("A line item with No: %v, accountingDocNo: %v,  already existed",
            No, docNo)
        return errorStr
    }

    return nil
}
...