gRPC медленная пропускная способность для удаленного сервера - PullRequest
0 голосов
/ 29 мая 2019

У меня есть служба gRPC, которая передает файлы с локального компьютера на удаленный сервер, и я замечаю некоторые существенные проблемы с пропускной способностью.В среднем он загружается со скоростью около 1 Мбит / с с одним соединением, разделяющим несколько потоков (обычно около 8).

Сервер использует TLS для шифрования, но это не является узким местом, так как отключение TLSоказывает незначительное влияние на производительность.Я также попытался использовать iperf3 для проверки пропускной способности непосредственно между клиентом и сервером, и это привело к 10 Мбит / с.

Connecting to host <host>, port <port>
[  7] local 10.0.0.112 port 59651 connected to <ip> port <port>
[ ID] Interval           Transfer     Bitrate
[  7]   0.00-1.00   sec  1.28 MBytes  10.7 Mbits/sec
[  7]   1.00-2.00   sec   894 KBytes  7.35 Mbits/sec
[  7]   2.00-3.00   sec   999 KBytes  8.17 Mbits/sec
[  7]   3.00-4.00   sec  1.19 MBytes  10.0 Mbits/sec
[  7]   4.00-5.00   sec   753 KBytes  6.17 Mbits/sec
[  7]   5.00-6.00   sec  1.16 MBytes  9.67 Mbits/sec
[  7]   6.00-7.00   sec  1.00 MBytes  8.44 Mbits/sec
[  7]   7.00-8.00   sec  1.26 MBytes  10.5 Mbits/sec
[  7]   8.00-9.00   sec  1.22 MBytes  10.2 Mbits/sec
[  7]   9.00-10.00  sec  1.15 MBytes  9.66 Mbits/sec
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate
[  7]   0.00-10.00  sec  10.8 MBytes  9.09 Mbits/sec                  sender
[  7]   0.00-10.00  sec  10.7 MBytes  8.95 Mbits/sec                  receiver

Пропускная способность загрузки с клиента составляет около 10 Мбит / с,и загрузка сервера составляет около 50 Мбит / с (через speedtest-cli)

traceroute тоже ничего интересного не показывает ...

traceroute to mikemeredith.ddns.net (108.52.111.249), 64 hops max, 72 byte packets
 1  10.0.0.1 (10.0.0.1)  2.195 ms  5.388 ms  1.385 ms
 2  <ip>  (<ip>)  8.256 ms  145.115 ms  19.025 ms
 3  <ip2> (<ip2>)  9.951 ms  9.471 ms  141.929 ms
 4  <ip3> (<ip3>)  18.389 ms  9.684 ms  12.248 ms
 5  <ip4> (<ip4>)  143.880 ms  25.077 ms  10.606 ms
 6  ae-13-ar01.capitolhghts.md.bad.comcast.net (68.87.168.61)  142.567 ms  137.153 ms  20.790 ms
 7  be-33657-cr02.ashburn.va.ibone.comcast.net (68.86.90.57)  14.326 ms  144.076 ms  22.957 ms
 8  be-1102-cs01.ashburn.va.ibone.comcast.net (96.110.32.169)  13.881 ms  144.756 ms  23.981 ms
 9  be-2107-pe07.ashburn.va.ibone.comcast.net (96.110.32.186)  20.203 ms  94.433 ms  23.034 ms
10  n-a.gw12.iad8.alter.net (152.179.50.205)  20.254 ms  278.023 ms  31.660 ms
11  * * *
12  <ip12> (<ip13>)  66.277 ms  39.229 ms  34.543 ms
13  <ip13> (<ip14>)  48.849 ms  49.300 ms  49.546 ms


ВотФактический код

Клиентское соединение:

creds, err := credentials.NewClientTLSFromFile(cerLoc, "")
if err != nil {
    fmt..Printf("failed to get tls from file: %v\n", err)
    panic(err)
}
conn, err = grpc.Dial(host+port, grpc.WithTransportCredentials(creds))

Клиентский поток:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := proto.Client(conn)
stream, err := client.BackupFiles(ctx, grpc.UseCompressor(gzip.Name))
// Send on stream, max size of message is 2mb

Прослушивание сервера:

// Start serving on port
l, err := net.Listen("tcp", port)
if err != nil {
    fmt.Printf("error listening on port %v: %v\n", port, err)
    panic(err)
}

var s *grpc.Server
creds, err := credentials.NewServerTLSFromFile(
    certLoc,
    keyLoc,
)
if err != nil {
    fmt.Printf("error getting tls certs: %v\n", err)
    panic(err)
}
s = grpc.NewServer(grpc.Creds(creds))
proto.RegisterBackupServer(s, &server{})
err = s.Serve(l)


// Actual stream handling

// Get a pooled SharedBuffer for assembling the file
b := getBuffer()
defer putBuffer(b)
c := make(chan int, []50)
u, _ := user.Current()

uid := uuid.New()
fout, err := os.Create(filePath + uid.String())
if err != nil {
    fmt.Println("error creating staging file: ", err)
    panic(err)
}
var wg sync.WaitGroup

go assemble(b, fout, c, &wg)
for {
    in, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        fmt.Println("encountered an error receiving on stream: ", err)
        return err
    }


    bytesWritten = b.LockWrite(in.Payload) // This buffer is shared between the stream and the go routine

    c <- bytesWritten
}


close(c)
wg.Wait()

_ = fout.Close()

// This is a pre-declared workerpool that basically moves files around 
wp.Submit(func() {
    finalizeFile(fout.Name(), name, perms, "", checkSum, userID)
})

return stream.SendAndClose(&proto.Resp{
    Status:   true,
})

func assemble(b *buffer.SharedBuffer, fout *os.File, in chan int, wg *sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    buf := make([]byte, buffer.BUFFSIZE*2)

    for i := range in {
        if fout != nil {
            b.Lock()
            _, err := b.Read(buf[:i])
            b.Unlock()
            if err == io.EOF {
                continue
            }
            if err != nil {
                panic(err)
            }
            n, err := fout.Write(buf[:i])
            if err != nil {
                panic(err)
            }
            if n != i {
                fmt.Printf("failed to write all bytes to file: %v != %v", n, i)
                panic(err)
            }
        }
    }
}

Похоже, я могу отсутствоватьчто-то с внутренней работой gRPC?

1 Ответ

0 голосов
/ 30 мая 2019

Если я правильно понял, вы читаете входной поток внутри одной процедуры и отправляете байты во второй программе.Почему бы вам не позволить второй программе полностью обработать поток?Таким образом, первая процедура свободна для обработки следующих потоков, если таковые имеются.

Обычно шаблон состоит в том, чтобы иметь одну подпрограмму, которая прослушивает входящие запросы и порождает новые подпрограммы для их обработки.Крайне важно, чтобы основная программа не вызывала никакого API блокировки, кроме той, которая прослушивает запросы.

Например:

for {
    newStream := ListenForStreams() //Block until next stream
    go consumeStream(newStream)
}
...