Загрузка нескольких файлов параллельно в Amazon S3 с помощью Goroutines & Channels - PullRequest
0 голосов
/ 14 мая 2019

Я пытаюсь загрузить каталог в корзину Amazon S3.Однако единственный способ загрузить каталог - это перебрать все файлы внутри каталога и загрузить их один за другим.

Я использую Go для перебора файлов в каталоге.Однако для каждого файла, который я перебираю, я хочу выделить процедуру, которая загружает файл, в то время как основной поток перебирает следующий элемент в каталоге и раскручивает другую процедуру, чтобы загрузить его.

Есть идеи, как можно параллельно загружать все файлы в каталог, используя Goroutines и Channels?

Пересмотренный фрагмент кода, в котором реализована программа и канал для параллельной загрузки файлов.Но я не уверен, что это правильная реализация.

func uploadDirToS3(dir string, svc *s3.S3) {
    fileList := []string{}
    filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
        fmt.Println("PATH ==> " + path)
        fileList = append(fileList, path)
        return nil
    })
    for _, pathOfFile := range fileList[1:] {
        channel := make(chan bool)
        go uploadFiletoS3(pathOfFile, svc, channel)
        <-channel
    }
}

func uploadFiletoS3(path string, svc *s3.S3, channel chan bool) {
    file, err := os.Open(path)
    if err != nil {
        fmt.Println(err)
    }
    defer file.Close()
    fileInfo, _ := file.Stat()
    size := fileInfo.Size()

    buffer := make([]byte, size)
    file.Read(buffer)
    fileBytes := bytes.NewReader(buffer)
    fileType := http.DetectContentType(buffer)

    s3Path := file.Name()

    params := &s3.PutObjectInput{
        Bucket:        aws.String("name-of-bucket"),
        Key:           aws.String(s3Path),
        Body:          fileBytes,
        ContentLength: aws.Int64(size),
        ContentType:   aws.String(fileType),
    }

    resp, err := svc.PutObject(params)
    if err != nil {
        fmt.Println(err)
    }
    fmt.Printf("response %s", awsutil.StringValue(resp))
    close(channel)
}

Любые идеи о том, как я мог бы реализовать это лучше?Я заглянул в WaitGroups, но по какой-то причине я обнаружил, что каналы намного проще для понимания и реализации в этой ситуации.

Ответы [ 2 ]

2 голосов
/ 15 мая 2019

Итак, вы ищете параллелизм, который коренится в инструкции go.Для синхронизации между запущенным в цикле циклом вы можете использовать chanels ИЛИ sync.WaitGroup.Второй вариант немного проще сделать.Кроме того, вы должны реорганизовать свою функцию и переместить внутреннюю логику for в отдельную функцию.

func uploadDirToS3(dir string, svc *s3.S3) {
    fileList := []string{}
    filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
        fileList = append(fileList, path)
        return nil
    })
    var wg sync.WaitGroup
    wg.Add(len(fileList))
    for _, pathOfFile := range fileList[1:] {
        //maybe spin off a goroutine here??
        go putInS3(pathOfFile, svc, &wg)
    }
    wg.Wait()
}

func putInS3(pathOfFile string, svc *s3.S3, wg *sync.WaitGroup) {
    defer func() {
        wg.Done()
    }()
    file, _ := os.Open(pathOfFile)
    defer file.Close()
    fileInfo, _ := file.Stat()
    size := fileInfo.Size()
    buffer := make([]byte, size)
    file.Read(buffer)
    fileBytes := bytes.NewReader(buffer)
    fileType := http.DetectContentType(buffer)
    path := file.Name()
    params := &s3.PutObjectInput{
        Bucket:        aws.String("bucket-name"),
        Key:           aws.String(path),
        Body:          fileBytes,
        ContentLength: aws.Int64(size),
        ContentType:   aws.String(fileType),
    }

    resp, _ := svc.PutObject(params)
    fmt.Printf("response %s", awsutil.StringValue(resp))
}
0 голосов
/ 16 мая 2019

Следующее строго не отвечает на OP, однако это попытка ввести параллельную обработку с использованием языка go.

надеюсь, это поможет.

package main

import (
    "log"
    "sync"
    "time"
)

func main() {

    // processInSync()
    // The processing takes up to 3seconds,
    // it displays all the output and handles errors.

    // processInParallel1()
    // The processing takes up to few microseconds,
    // it displays some of the output and does not handle errors.
    // It is super fast, but incorrect.

    // processInParallel2()
    // The processing takes up to 1s,
    // It correctly displays all the output,
    // But it does not yet handle return values.

    processInParallel3()
    // The processing takes up to 1s,
    // It correctly displays all the output,
    // and it is able to return the first error encountered.

    // This merely just an introduction to what you are able to do.
    // More examples are required to explains the subtletlies of channels
    // to implement unbound work processing.
    // I leave that as an exercise to the reader.
    // For more information and explanations about channels,
    // Read The Friendly Manual and the tons of examples
    // we left on the internet.
    // https://golang.org/doc/effective_go.html#concurrency
    // https://gobyexample.com/channels
    // https://gobyexample.com/closing-channels
}

func aSlowProcess(name string) error {
    log.Println("aSlowProcess ", name)
    <-time.After(time.Second)
    return nil
}

//processInSync a dummy function calling a slow function one after the other.
func processInSync() error {
    now := time.Now()
    // it calls the slow process three time,
    // one after the other;
    // If an error is returned, returns asap.
    if err := aSlowProcess("#1"); err != nil {
        return err
    }
    if err := aSlowProcess("#2"); err != nil {
        return err
    }
    if err := aSlowProcess("#3"); err != nil {
        return err
    }
    // This is a sync process because it does not involve
    // extra synchronisation mechanism.
    log.Printf("processInSync spent %v\n", time.Now().Sub(now))
    return nil
}

// processInParallel1 implements parallel processing example.
// it is not yet a fully working example, to keep it simple,
// it only implements the sending part of the processing.
func processInParallel1() error {
    now := time.Now()

    // We want to execute those function calls in parallel
    // for that we use the go keyword which allows to run the function
    // into a separate routine/process/thread.
    // It is called async because the main thread and the
    // the new routines requires to be synchronized.
    // To synchronize two independant routine we must use
    // atomic (race free) operators.

    // A channel is an atomic operator because it is safe to
    // read and write from it from multiple parallel
    // and independant routines.

    // before we implement such processing, we must ask ourselve
    // what is the input i need to distribute among routines,
    // and what are the values i want to get from those routines.

    // lets create a channel of string to distribute the input to multiple
    // independant workers.
    distributor := make(chan string)

    // The input channel MUST be read from the new routines.
    // We create three workers of slow process, reading and processing.
    go func() {
        value := <-distributor
        aSlowProcess(value)
    }()
    go func() {
        value := <-distributor
        aSlowProcess(value)
    }()
    go func() {
        value := <-distributor
        aSlowProcess(value)
    }()

    // we must now write the values into the distributor
    // so that each worker can read and process data.
    distributor <- "#1"
    distributor <- "#2"
    distributor <- "#3"

    log.Printf("processInParallel1 spent %v\n", time.Now().Sub(now))

    return nil
}

// processInParallel2 implements parallel processing example.
// it is not yet a fully working example, to keep it simple,
// it implements the sending part of the processing,
// and the synchronization mechanism to wait for all workers
// to finish before returning.
func processInParallel2() error {
    now := time.Now()

    // We saw in the previous example how to send values and process
    // them in parallel, however, that function was not able to wait for
    // those async process to finish before returning.

    // To implement such synchronization mechanism
    // where the main thread waits for all workers to finish
    // before returning we need to use the sync package.
    // It provides the best pattern to handle that requirements.

    // In addition to the previous example we now instantiate a
    // WaitGroup https://golang.org/pkg/sync/#WaitGroup
    // The purpose of the wait group is to record a number
    // of async jobs to process and wait for them to finish.

    var wg sync.WaitGroup

    distributor := make(chan string)

    // Because we have three workers, we add three to the group.
    wg.Add(1)
    go func() {
        // Then we make sure that we signal to the waitgroup 
    // that the process is done.
        defer wg.Done()
        value := <-distributor
        aSlowProcess(value)
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done() // as an exercise, comment this line 
    // and inspect the output of your program.
        value := <-distributor
        aSlowProcess(value)
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        aSlowProcess(value)
    }()

    // we can now write the data for processing....
    distributor <- "#1"
    distributor <- "#2"
    distributor <- "#3"

    //....and wait for their completion
    wg.Wait()

    log.Printf("processInParallel2 spent %v\n", time.Now().Sub(now))

    return nil
}

// processInParallel3 implements parallel processing example.
// It is a fully working example that distribute jobs, 
// wait for completion and catch for return values.
func processInParallel3() error {
    now := time.Now()

    var wg sync.WaitGroup
    distributor := make(chan string)

    // To catch for return values we must implement a
    // way for output values to safely reach the main thread.
    // We create a channel of errors for that purpose.
    receiver := make(chan error)

    // As previsouly we start the workers, and attach them to a waitgroup.
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        err := aSlowProcess(value)
        // to return the value we write on the output channel.
        receiver <- err
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        receiver <- aSlowProcess(value)
    }()
    //-
    wg.Add(1)
    go func() {
        defer wg.Done()
        value := <-distributor
        receiver <- aSlowProcess(value)
    }()

    // we can now write the data for processing....
    distributor <- "#1"
    distributor <- "#2"
    distributor <- "#3"

    /// ... read the output values
    err1 := <-receiver
    err2 := <-receiver
    err3 := <-receiver

    //....and wait for routines completion....
    wg.Wait()

    log.Printf("processInParallel3 spent %v\n", time.Now().Sub(now))

    // finally check for errors
    if err1 != nil {
        return err1
    }
    if err2 != nil {
        return err2
    }
    if err3 != nil {
        return err3
    }

    return nil
}
...