database / sql rows.scan зависает после 350K строк - PullRequest
0 голосов
/ 06 мая 2020

У меня есть задача извлечь данные из базы данных Oracle, и я пытаюсь извлечь для обработки огромные данные> 6MM записей со 100 столбцами.

Необходимо преобразовать данные в карту. Мне удалось обработать 350 тысяч записей менее чем за 35 секунд. После этого сервер зависает и дальше не идет. Есть ли способ группировать их в зависимости от размера строки или группировать их, чтобы освободить место.

func FetchUsingGenericResult(ctx context.Context, dsConnection *string, sqlStatement string) (*entity.GenericResultCollector, error) {
    columnTypes := make(map[string]string)
    var resultCollection entity.GenericResultCollector
    db, err := sql.Open("godror", *dsConnection)
    if err != nil {
        return &resultCollection, errors.Wrap(err, "error connecting to Oracle")
    }
    log := logger.FromContext(ctx).Sugar()
    log.Infof("start querying from Oracle at :%v", time.Now())
    rows, err := db.Query(sqlStatement, godror.FetchRowCount(defaultFetchCount))
    if err != nil {
        return &resultCollection, errors.Wrap(err, "error querying")
    }
    objects, err := rows2Strings(ctx, rows)
    log.Infof("total Rows converted are :%v by %v", len(*objects), time.Now())
    resultCollection = entity.GenericResultCollector{
        Columns: columnTypes,
        Rows:    objects,
    }
    return &resultCollection, nil
}

func rows2Strings(ctx context.Context, rows *sql.Rows) (*[]map[string]string, error) {
    result := make(map[string]string)
    resultsSlice := []map[string]string{}
    fields, err := rows.Columns()
    if err != nil {
        return nil, err
    }
    log := logger.FromContext(ctx).Sugar()
    waitGroup, ctx := errgroup.WithContext(ctx)
    counter := 0
    for rows.Next() {
        counter++
        if counter%defaultFetchCount == 0 {
            log.Infof("finished converting %v rows by %v", counter, time.Now())
        }
        waitGroup.Go(func() error {
            result, err = row2mapStr(rows, fields)
            if err != nil {
                return err
            }
            resultsSlice = append(resultsSlice, result)
            return nil
        })
        if err := waitGroup.Wait(); err != nil {
            return nil, err
        }
    }
    return &resultsSlice, nil
}

func row2mapStr(rows *sql.Rows, fields []string) (resultsMap map[string]string, err error) {
    result := make(map[string]string)
    scanResultContainers := make([]interface{}, len(fields))
    for i := 0; i < len(fields); i++ {
        var scanResultContainer interface{}
        scanResultContainers[i] = &scanResultContainer
    }
    if err := rows.Scan(scanResultContainers...); err != nil {
        return nil, err
    }
    for ii, key := range fields {
        rawValue := reflect.Indirect(reflect.ValueOf(scanResultContainers[ii]))
        // if row is null then as empty string
        if rawValue.Interface() == nil {
            result[key] = ""
            continue
        }

        if data, err := value2String(&rawValue); err == nil {
            result[key] = data
        } else {
            return nil, err
        }
    }
    return result, nil
}

func value2String(rawValue *reflect.Value) (str string, err error) {
    aa := reflect.TypeOf((*rawValue).Interface())
    vv := reflect.ValueOf((*rawValue).Interface())
    switch aa.Kind() {
    case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
        str = strconv.FormatInt(vv.Int(), 10)
    case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
        str = strconv.FormatUint(vv.Uint(), 10)
    case reflect.Float32, reflect.Float64:
        str = strconv.FormatFloat(vv.Float(), 'f', -1, 64)
    case reflect.String:
        str = vv.String()
    case reflect.Array, reflect.Slice:
        switch aa.Elem().Kind() {
        case reflect.Uint8:
            data := rawValue.Interface().([]byte)
            str = string(data)
            if str == "\x00" {
                str = "0"
            }
        default:
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        }
    // time type
    case reflect.Struct:
        if aa.ConvertibleTo(timeType) {
            str = vv.Convert(timeType).Interface().(time.Time).Format(time.RFC3339Nano)
        } else {
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        }
    case reflect.Bool:
        str = strconv.FormatBool(vv.Bool())
    case reflect.Complex128, reflect.Complex64:
        str = fmt.Sprintf("%v", vv.Complex())
    default:
        err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
    }
    return
}

Кто-нибудь сталкивался с подобной проблемой?

Изменил код, как показано ниже:

func FetchUsingGenericResult(ctx context.Context, dsConnection *string, sqlStatement string) (*entity.GenericResultCollector, error) {
    columnTypes := make(map[string]string)
    var resultCollection entity.GenericResultCollector
    db, err := sql.Open("godror", *dsConnection)
    if err != nil {
        return &resultCollection, errors.Wrap(err, "error connecting to Oracle")
    }
    log := logger.FromContext(ctx).Sugar()
    log.Infof("start querying from Oracle at :%v", time.Now())
    rows, err := db.Query(sqlStatement, godror.FetchRowCount(defaultFetchCount))
    if err != nil {
        return &resultCollection, errors.Wrap(err, "error querying")
    }
    objects, err := rows2Strings(ctx, rows)
    log.Infof("total Rows converted are :%v by %v", len(*objects), time.Now())
    resultCollection = entity.GenericResultCollector{
        Columns: columnTypes,
        Rows:    objects,
    }
    return &resultCollection, nil
}

func rows2Strings(ctx context.Context, rows *sql.Rows) (*[]map[string]string, error) {
    result := make(map[string]string)
    resultsSlice := []map[string]string{}
    fields, err := rows.Columns()
    if err != nil {
        return nil, err
    }
    log := logger.FromContext(ctx).Sugar()
    counter := 0
    for rows.Next() {
        counter++
        if counter%defaultFetchCount == 0 {
            log.Infof("finished converting %v rows by %v", counter, time.Now())
        }
        result, err = row2mapStr(rows, fields)
        if err != nil {
            return nil, err
        }
        resultsSlice = append(resultsSlice, result)
    }
    return &resultsSlice, nil
}

func row2mapStr(rows *sql.Rows, fields []string) (resultsMap map[string]string, err error) {
    result := make(map[string]string)
    scanResultContainers := make([]interface{}, len(fields))
    for i := 0; i < len(fields); i++ {
        var scanResultContainer interface{}
        scanResultContainers[i] = &scanResultContainer
    }
    if err := rows.Scan(scanResultContainers...); err != nil {
        return nil, err
    }
    for ii, key := range fields {
        rawValue := reflect.Indirect(reflect.ValueOf(scanResultContainers[ii]))
        // if row is null then as empty string
        if rawValue.Interface() == nil {
            result[key] = ""
            continue
        }

        if data, err := value2String(&rawValue); err == nil {
            result[key] = data
        } else {
            return nil, err
        }
    }
    return result, nil
}

func value2String(rawValue *reflect.Value) (str string, err error) {
    aa := reflect.TypeOf((*rawValue).Interface())
    vv := reflect.ValueOf((*rawValue).Interface())
    switch aa.Kind() {
    case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
        str = strconv.FormatInt(vv.Int(), 10)
    case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
        str = strconv.FormatUint(vv.Uint(), 10)
    case reflect.Float32, reflect.Float64:
        str = strconv.FormatFloat(vv.Float(), 'f', -1, 64)
    case reflect.String:
        str = vv.String()
    case reflect.Array, reflect.Slice:
        switch aa.Elem().Kind() {
        case reflect.Uint8:
            data := rawValue.Interface().([]byte)
            str = string(data)
            if str == "\x00" {
                str = "0"
            }
        default:
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        }
    // time type
    case reflect.Struct:
        if aa.ConvertibleTo(timeType) {
            str = vv.Convert(timeType).Interface().(time.Time).Format(time.RFC3339Nano)
        } else {
            err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
        }
    case reflect.Bool:
        str = strconv.FormatBool(vv.Bool())
    case reflect.Complex128, reflect.Complex64:
        str = fmt.Sprintf("%v", vv.Complex())
    default:
        err = fmt.Errorf("Unsupported struct type %v", vv.Type().Name())
    }
    return
}

1 Ответ

0 голосов
/ 06 мая 2020

Поскольку память сервера была ограничена, массив мог застрять без продолжения. Я начал обрабатывать данные по мере их сканирования, и это решило мою проблему.

...