У меня есть следующее server/server.go
определение для моего gRPC
сервера:
package main
import (
"flag"
"github.com/golang/glog"
pb "github.com/go-grpc-tutorial/pb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"net"
)
// Implements of EchoServiceServer
type echoServer struct{}
func newEchoServer() pb.EchoServiceServer {
return new(echoServer)
}
func (s *echoServer) Echo(ctx context.Context, msg *pb.Message) (*pb.Message, error) {
glog.Info(msg)
return msg, nil
}
func Run() error {
listen, err := net.Listen("tcp", ":50051")
if err != nil {
return err
}
server := grpc.NewServer()
pb.RegisterEchoServiceServer(server, newEchoServer())
server.Serve(listen)
return nil
}
func main() {
flag.Parse()
defer glog.Flush()
if err := Run(); err != nil {
glog.Fatal(err)
}
}
Я хочу, чтобы сервер прослушивал поток Kinesis
, поэтому всякий раз, когда сообщение отправляется на Kinesis
stream, он будет использовать это сообщение и отправит его методу Echo()
. Итак, я предполагаю что-то вроде:
func handleKinesis(s *echoServer) error {
// wait until a message is published to the kinesis stream
records, err := readFromKinesis() // not implemented here
if err != nil {
return err
}
for _, record := records {
message := string(record.Data)
_, _ = s.Echo(context.Background(), &pb.Message{msg: message})
}
}
И handleKinesis
будет работать в фоновом режиме, просто слушая новые сообщения Kinesis
, а затем вызывает echoServer
для обработки этого сообщения. Как я могу это сделать?