Передача файла с ftp на S3 через io.Pipe - PullRequest
0 голосов
/ 18 октября 2019

Мой код доступа

package main

import (
    "io"
    "log"
    "os"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3/s3manager"
    "github.com/secsy/goftp"
)

func main() {
    clientConfig := goftp.Config{
        User:               "myUser",
        Password:           "myPassword",
        ConnectionsPerHost: 10,
        Timeout:            10 * time.Second,
        Logger:             os.Stderr,
    }

    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("us-east-1")},
    )

    uploader := s3manager.NewUploader(sess)

    client, err := goftp.DialConfig(clientConfig, "myFeed.wmyDomain.com:myPort")
    if err != nil {
        panic(err)
    }

    reader, writer := io.Pipe()

    client.Retrieve("myPath/myFile.gz", writer)

    result, err := uploader.Upload(&s3manager.UploadInput{
        Body:   reader,
        Bucket: aws.String("myBucket"),
        Key:    aws.String("myTestFile"),
    })

    if err != nil {
        log.Fatalln("Failed to upload", err)
    }

    log.Println("Successfully uploaded to", result.Location)
    writer.CloseWithError(err)
}

Это мой вывод

  goftp: 0.000 #1 opening control connection to [myIP]:myPort
    goftp: 0.134 #1 sending command USER myUser
    goftp: 0.203 #1 got 331-Username ok, send password.
    goftp: 0.203 #1 sending command PASS ******
    goftp: 0.269 #1 got 230-Login successful.
    goftp: 0.269 #1 sending command FEAT
    goftp: 0.336 #1 got 211-Features supported:
     EPRT
     EPSV
     MDTM
     MFMT
     MLST type*;perm*;size*;modify*;unique*;unix.mode;unix.uid;unix.gid;
     REST STREAM
     SIZE
     TVFS
     UTF8
    End FEAT.
    goftp: 0.336 #1 sending command TYPE I
    goftp: 0.403 #1 got 200-Type set to: Binary.
    goftp: 0.403 #1 sending command SIZE myPath/myFile.gz
    goftp: 0.474 #1 got 213-33620711002
    goftp: 0.474 #1 was ready
    goftp: 0.474 #1 was ready
    goftp: 0.474 #1 sending command TYPE I
    goftp: 0.544 #1 got 200-Type set to: Binary.
    goftp: 0.544 #1 sending command EPSV
    goftp: 0.610 #1 got 229-Entering extended passive mode (|||60706|).
    goftp: 0.610 #1 opening data connection to [myIP]:myOtherPort
    goftp: 0.677 #1 sending command RETR myPath/myFile.gz
    goftp: 0.746 #1 got 125-Data connection already open. Transfer starting.

Однако, похоже, он просто застревает и ничего не делает. Файл большой, но у меня 0 движений по сетевой активности вообще.

Я могу загрузить файл при использовании os.Open или os.Create и указать моему клиенту, чтобы получить в этот поток, но не при использованииS3. Он даже не пытается загрузить файл тогда.

ОБНОВЛЕНИЕ: Благодаря предложению @Peters об использовании многократных подпрограмм, мне удалось заставить это работать. Однако я столкнулся с проблемой с отложенными операторами, так как понимаю, что они должны выполняться к концу вызова функции или идти рутиной. Это мой новый код.

package main

import (
    "io"
    "log"
    "os"
    "sync"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3/s3manager"
    "github.com/secsy/goftp"
)

func main() {

    // Use sync to force each goroutine thread to complete
    messages := make(chan int)
    var wg sync.WaitGroup

    // Start a session with aws us-east-1 region
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("us-east-1")},
    )

    // Create a client for s3
    uploader := s3manager.NewUploader(sess)

    // Create a client for goftp
    // TODO Have credentials pulled from SSM
    client, err := goftp.DialConfig(goftp.Config{
        User:               "MyUser",
        Password:           "MyPassword",
        ConnectionsPerHost: 10,
        Timeout:            10 * time.Second,
        Logger:             os.Stderr,
    }, "myftpHost.com")

    if err != nil {
        panic(err)
    }

    // Instatiate IO stream
    reader, writer := io.Pipe()

    // Set number of goroutines to wait for
    wg.Add(2)

    // goroutine thread that pulls the file from ftp and puts it in writer stream
    go func() {
        defer writer.Close()
        defer client.Close()
        err = client.Retrieve("MyFiletxt", writer)
        if err != nil {
            panic(err)
        }
        wg.Done()

        messages <- 1
    }()

    // goroutine thread that pull from reader thread and pushes it to s3
    go func() {
        defer reader.Close()
        result, err := uploader.Upload(&s3manager.UploadInput{
            Body:   reader,
            Bucket: aws.String("MyBucket"),
            Key:    aws.String("myTestFile"),
        })

        if err != nil {
            log.Fatalln("Failed to upload", err)
        }

        log.Println("Successfully uploaded to", result.Location)

        wg.Done()
        messages <- 2
    }()

    wg.Wait()
}

Проблема заключается в том, что операторы defer никогда не запускаются, что приводит к постоянному запуску io и зависанию всего сценария. Не уверен, почему операторы отсрочки не выполняют свою работу.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...