Как читать асинхронную потоковую передачу на стороне сервера с использованием GRPC c ++ - PullRequest
3 голосов
/ 22 октября 2019

Я пытаюсь реализовать асинхронную потоковую передачу на стороне сервера с использованием C ++. но я не могу найти хороший пример для того же. Мне трудно читать поток асинхронно.

Код сервера

class ServerImpl final
{
public:
    ~ServerImpl()
    {
        server_->Shutdown();
        cq_->Shutdown();
    }

    void Run()
    {
        std::string server_address("0.0.0.0:50051");

        ServerBuilder builder;
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service_);

        cq_ = builder.AddCompletionQueue();
        server_ = builder.BuildAndStart();
        std::cout << "Server listening on " << server_address << std::endl;

        HandleRpcs();
    }

private:
    class CallData
    {
    public:
        CallData(MultiGreeter::AsyncService* service, ServerCompletionQueue* cq)
            : service_(service)
            , cq_(cq)
            , responder_(&ctx_)
            , status_(CREATE)
            , times_(0)
        {
            Proceed();
        }
      void Proceed()
        {
            if (status_ == CREATE)
            {
                status_ = PROCESS;
                service_->RequestsayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
            }
            else if (status_ == PROCESS)
            {                   
                if (times_ == 0)
                {
                    new CallData(service_, cq_);
                }    
                if (times_++ >= 3)
                {
                    status_ = FINISH;
                    responder_.Finish(Status::OK, this);
                }
                else
                {
                    std::string prefix("Hello ");
                    reply_.set_message(prefix + request_.name() + ", no " + request_.num_greetings());    
                    responder_.Write(reply_, this);
                }
            }
            else
            {
                GPR_ASSERT(status_ == FINISH);
                delete this;
            }
        }    
    private:
        MultiGreeter::AsyncService* service_;
        ServerCompletionQueue* cq_;
        ServerContext ctx_;  
        HelloRequest request_;
        HelloReply reply_; 
        ServerAsyncWriter<HelloReply> responder_;    
        int times_;   
        enum CallStatus
        {
            CREATE,
            PROCESS,
            FINISH
        };
        CallStatus status_;
    };

    void HandleRpcs()
    {
        new CallData(&service_, cq_.get());
        void* tag; 
        bool ok;
        while (true)
        {
            GPR_ASSERT(cq_->Next(&tag, &ok));
            GPR_ASSERT(ok);
            static_cast<CallData*>(tag)->Proceed();
        }
    }
    std::unique_ptr<ServerCompletionQueue> cq_;
    MultiGreeter::AsyncService service_;
    std::unique_ptr<Server> server_;
};

код клиента

class GreeterClient
{
public:
    GreeterClient(std::shared_ptr<Channel> channel)
        : stub_(MultiGreeter::NewStub(channel))
    {}
    void SayHello(const std::string& user, const std::string& num_greetings)
    {
        HelloRequest request;
        request.set_name(user);
        request.set_num_greetings(num_greetings);
        ClientContext context;
        CompletionQueue cq;
        void* got_tag = (void*)1;
        bool ok = false;
        std::unique_ptr<ClientAsyncReader<HelloReply>>       reader(stub_>PrepareAsyncSayHello(&context,request, &cq));    
        std::cout << "Got reply: " << reply.message() << std::endl;          
        reader->Read(&reply,got_tag);
        Status status;
        reader->Finish(&status, (void*)1);
        GPR_ASSERT(cq.Next(&got_tag, &ok));
        GPR_ASSERT(ok);   
        if (status.ok()) 
        {
            std::cout << "sayHello rpc succeeded." << std::endl;
        } 
        else 
        {
            std::cout << "sayHello rpc failed." << std::endl;
            std::cout << status.error_code() << ": " << status.error_message() << std::endl;
        }
    }
private:
    std::unique_ptr<MultiGreeter::Stub> stub_;
};

Мой прото-сервис

rpc SayHello (HelloRequest) returns (stream HelloReply) {}

когда я читаю поток синхронно с помощью ClientReader, он работает нормально. но когда я использую ClientAsyncReader, я получаю эту ошибку времени выполнения.

Assertion failed: (started_), function Finish, file /usr/local/include/grpcpp/impl/codegen/async_stream_impl.h, line 250.
23:25:40: The program has unexpectedly finished.

Заранее спасибо.

...