Go GRP C Производительность двунаправленного потока - PullRequest
6 голосов
/ 29 апреля 2020

Мы разрабатываем высокочастотную торговую платформу, и в одном из наших компонентов мы внедрили grp c с golang. И нам нужно было использовать двунаправленную потоковую передачу в одном из наших сценариев использования, мы сделали пример реализации, как показано в приведенном ниже коде, однако, когда мы проверяем производительность кода, проверяя разницу между временными метками журналов в

Recv Time %v Index: %v Num: %v
Send Time %v, Index: %v, Num: %v

мы выяснили, что вызов .Send метода потока со стороны клиента и получение тех же данных путем вызова .Recv на стороне сервера тоже занимает примерно 400-800 микросекунд, что слишком мало для нас. Нам нужна максимальная производительность 10-50 микросекунд, и когда мы прочитали рекомендации, мы увидели, что grp c может go до наносекунд, если и клиент, и сервер находятся на одном компьютере (что в точности соответствует нашему случаю)

Так что я думаю, что нам не хватает некоторых опций или некоторых хитростей производительности по этому поводу. Кто-нибудь знает, что мы можем сделать, чтобы увеличить эту проблему производительности

Код клиента:

package main

import (
    "context"
    "log"
    "math/rand"

    pb "github.com/pahanini/go-grpc-bidirectional-streaming-example/src/proto"

    "time"

    "google.golang.org/grpc"
)

func main() {
    rand.Seed(time.Now().Unix())

    // dail server
    conn, err := grpc.Dial(":50005", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("can not connect with server %v", err)
    }

    // create stream
    client := pb.NewMathClient(conn)
    stream, err := client.Max(context.Background())
    if err != nil {
        log.Fatalf("openn stream error %v", err)
    }

    var max int32
    ctx := stream.Context()
    done := make(chan bool)
    msgCount := 100
    fromMsg := 0

    // first goroutine sends random increasing numbers to stream
    // and closes int after 10 iterations
    go func() {
        for i := 1; i <= msgCount; i++ {
            // generate random nummber and send it to stream
            rnd := int32(i)
            req := pb.Request{Num: rnd}
            if i-1 >= fromMsg {
                sendTime := time.Now().UnixNano()
                log.Printf("Send Time %v, Index: %v, Num: %v", sendTime,i-1,req.Num)
            }

            if err := stream.Send(&req); err != nil {
                log.Fatalf("can not send %v", err)
            }
            //afterSendTime := time.Now().UnixNano()
            //log.Printf("After Send Time %v", afterSendTime)
            //log.Printf("---------------")
            //log.Printf("%d sent", req.Num)
            //time.Sleep(time.Millisecond * 200)
        }
        if err := stream.CloseSend(); err != nil {
            log.Println(err)
        }
    }()

    // third goroutine closes done channel
    // if context is done
    go func() {
        <-ctx.Done()
        if err := ctx.Err(); err != nil {
            log.Println(err)
        }
        close(done)
    }()

    <-done
    log.Printf("finished with max=%d", max)
}

Код сервера:

package main

import (
    "io"
    "log"
    "net"
    "time"

    pb "github.com/pahanini/go-grpc-bidirectional-streaming-example/src/proto"

    "google.golang.org/grpc"
)

type server struct{}

func (s server) Max(srv pb.Math_MaxServer) error {

    log.Println("start new server")
    var max int32
    ctx := srv.Context()

    i := 0
    fromMsg := 0
    for {
        // exit if context is done
        // or continue
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // receive data from stream
        req, err := srv.Recv()

        if err == io.EOF {
            // return will close stream from server side
            log.Println("exit")
            return nil
        }
        if err != nil {
            log.Printf("receive error %v", err)
            continue
        }

        if i >= fromMsg {
            recvTime := time.Now().UnixNano()
            log.Printf("Recv Time %v Index: %v Num: %v", recvTime,i,req.Num)
        }

        i++

        // continue if number reveived from stream
        // less than max
        if req.Num <= max {
            continue
        }

        // update max and send it to stream
        /*
            max = req.Num
            resp := pb.Response{Result: max}
            if err := srv.Send(&resp); err != nil {
                log.Printf("send error %v", err)
            }
        */
        //log.Printf("send new max=%d", max)
    }
}

func main() {
    // create listiner
    lis, err := net.Listen("tcp", ":50005")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    // create grpc server
    s := grpc.NewServer()
    pb.RegisterMathServer(s, server{})

    // and start...
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
...