Я пытаюсь реализовать асинхронную потоковую передачу на стороне сервера с использованием C ++. но я не могу найти хороший пример для того же. Мне трудно читать поток асинхронно.
Код сервера
class ServerImpl final
{
public:
~ServerImpl()
{
server_->Shutdown();
cq_->Shutdown();
}
void Run()
{
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
HandleRpcs();
}
private:
class CallData
{
public:
CallData(MultiGreeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service)
, cq_(cq)
, responder_(&ctx_)
, status_(CREATE)
, times_(0)
{
Proceed();
}
void Proceed()
{
if (status_ == CREATE)
{
status_ = PROCESS;
service_->RequestsayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
}
else if (status_ == PROCESS)
{
if (times_ == 0)
{
new CallData(service_, cq_);
}
if (times_++ >= 3)
{
status_ = FINISH;
responder_.Finish(Status::OK, this);
}
else
{
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() + ", no " + request_.num_greetings());
responder_.Write(reply_, this);
}
}
else
{
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
MultiGreeter::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
HelloRequest request_;
HelloReply reply_;
ServerAsyncWriter<HelloReply> responder_;
int times_;
enum CallStatus
{
CREATE,
PROCESS,
FINISH
};
CallStatus status_;
};
void HandleRpcs()
{
new CallData(&service_, cq_.get());
void* tag;
bool ok;
while (true)
{
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
MultiGreeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
код клиента
class GreeterClient
{
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(MultiGreeter::NewStub(channel))
{}
void SayHello(const std::string& user, const std::string& num_greetings)
{
HelloRequest request;
request.set_name(user);
request.set_num_greetings(num_greetings);
ClientContext context;
CompletionQueue cq;
void* got_tag = (void*)1;
bool ok = false;
std::unique_ptr<ClientAsyncReader<HelloReply>> reader(stub_>PrepareAsyncSayHello(&context,request, &cq));
std::cout << "Got reply: " << reply.message() << std::endl;
reader->Read(&reply,got_tag);
Status status;
reader->Finish(&status, (void*)1);
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(ok);
if (status.ok())
{
std::cout << "sayHello rpc succeeded." << std::endl;
}
else
{
std::cout << "sayHello rpc failed." << std::endl;
std::cout << status.error_code() << ": " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<MultiGreeter::Stub> stub_;
};
Мой прото-сервис
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
когда я читаю поток синхронно с помощью ClientReader, он работает нормально. но когда я использую ClientAsyncReader, я получаю эту ошибку времени выполнения.
Assertion failed: (started_), function Finish, file /usr/local/include/grpcpp/impl/codegen/async_stream_impl.h, line 250.
23:25:40: The program has unexpectedly finished.
Заранее спасибо.