Golang - Ошибка при gzipping mongodb найти данные курсора запроса, записать в файл и распаковать его - PullRequest
1 голос
/ 26 апреля 2020

Я перебираю курсор в mongodb, архивирую данные и отправляю объекту S3. При попытке распаковать загруженный файл с помощью gzip -d, получая следующую ошибку,

gzip: 9.log.gz: invalid compressed data--crc error
gzip: 9.log.gz: invalid compressed data--length error

Код, который я использую для итерации, сжатия, загрузки, приведен ниже,

// CursorReader struct acts as reader wrapper on top of mongodb cursor
type CursorReader struct {
    Csr *mongo.Cursor
}

// Read func reads the data from cursor and puts it into byte array
func (cr *CursorReader) Read(p []byte) (n int, err error) {
    dataAvail := cr.Csr.Next(context.TODO())
    if !dataAvail {
        n = 0
        err = io.EOF
        if cr.Csr.Close(context.TODO()) != nil {
            fmt.Fprintf(os.Stderr, "Error: MongoDB: getting logs: close cursor: %s", err)
        }
        return
    }
    var b bytes.Buffer
    w := gzip.NewWriter(&b)
    w.Write([]byte(cr.Csr.Current.String() + "\n"))
    w.Close()
    n = copy(p, []byte(b.String()))
    err = nil
    return
}
cursor, err := coll.Find(ctx, filter) // runs the find query and returns cursor
csrRdr := new(CursorReader) // creates a new cursorreader instance
csrRdr.Csr = cursor // assigning the find cursor to cursorreader instance
_, err = s3Uploader.Upload(&s3manager.UploadInput{  // Uploading the data to s3 in parts
    Bucket: aws.String("bucket"),
    Key:    aws.String("key")),
    Body:   csrRdr, 
})

Если данных мало, проблема не возникает. но если данные огромны, то я получаю ошибку. Вещи, которые я до сих пор отлаживал, пытаясь сжать 1500 документов, каждый размером 15 МБ, получая ошибку. Даже я пытался записать сжатые байты непосредственно в файл локально, но я получаю ту же ошибку.

1 Ответ

1 голос
/ 26 апреля 2020

Похоже, проблема заключается в повторном вызове gzip.NewWriter() в func(*CursorReader) Read([]byte) (int, error)

Вы выделяете новый gzip.Writer для каждого вызова Read. Сжатие gzip с сохранением состояния, поэтому вы должны использовать только один экземпляр Writer для всех операций.


Решение # 1

Довольно простым решением вашей проблемы будет прочитать все строки в курсоре и передать его через gzip.Writer и сохранить сжатый контент в буфере в памяти.

var cursor, _ = collection.Find(context.TODO(), filter)
defer cursor.Close(context.TODO())

// prepare a buffer to hold gzipped data
var buffer bytes.Buffer
var gz = gzip.NewWriter(&buffer)
defer gz.Close()

for cursor.Next(context.TODO()) {
    if _, err = io.WriteString(gz, cursor.Current.String()); err != nil {
        // handle error somehow  ¯\_(ツ)_/¯
    }
}

// you can now use buffer as io.Reader
// and it'll contain gzipped data for your serialized rows
_, err = s3.Upload(&s3.UploadInput{
    Bucket: aws.String("..."),
    Key:    aws.String("...")),
    Body:   &buffer, 
})

Решение # 2

Другое решение будет использовать io.Pipe() и goroutines для создания потока, который читает и сжимает данные по требованию, а не в буфере в памяти. Это полезно, если данные, которые вы читаете, достаточно велики, и вы не можете хранить все это в памяти.

var cursor, _ = collection.Find(context.TODO(), filter)
defer cursor.Close(context.TODO())

// create pipe endpoints
reader, writer := io.Pipe()

// note: io.Pipe() returns a synchronous in-memory pipe
// reads and writes block on one another
// make sure to go through docs once.

// now, since reads and writes on a pipe blocks
// we must move to a background goroutine else
// all our writes would block forever
go func() {
    // order of defer here is important
    // see: https://stackoverflow.com/a/24720120/6611700
    // make sure gzip stream is closed before the pipe
    // to ensure data is flushed properly
    defer writer.Close()
    var gz = gzip.NewWriter(writer)
    defer gz.Close()

    for cursor.Next(context.Background()) {
        if _, err = io.WriteString(gz, cursor.Current.String()); err != nil {
            // handle error somehow  ¯\_(ツ)_/¯
        }
    }
}()

// you can now use reader as io.Reader
// and it'll contain gzipped data for your serialized rows
_, err = s3.Upload(&s3.UploadInput{
    Bucket: aws.String("..."),
    Key:    aws.String("...")),
    Body:   reader, 
})
...