Я сейчас реализую алгоритм Рафта, и я хочу использовать поток gRPC для этого.Моя основная идея состоит в том, чтобы создать 3 потока для каждого узла для всех остальных узлов, один поток будет передавать RPC одного типа, есть AppendEntries
, RequestVote
и InstallSnapshot
.Я пишу некоторый код с ограниченной справкой из route_guide , потому что в своей демонстрации двунаправленного потока RouteChat
клиент отправляет все свои данные, прежде чем он начнет читать.
Во-первых, я хочу записать в поток в любое время, поэтому я пишу следующие коды
void RaftMessagesStreamClientSync::AsyncRequestVote(const RequestVoteRequest& request){
std::string peer_name = this->peer_name;
debug("GRPC: Send RequestVoteRequest from %s to %s\n", request.name().c_str(), peer_name.c_str());
request_vote_stream->Write(request);
}
Между тем, я хочу, чтобы поток продолжал читать из потока, как следующие коды, который вызывается сразу после создания RaftMessagesStreamClientSync
.
void RaftMessagesStreamClientSync::handle_response(){
// strongThis is a must
auto strongThis = shared_from_this();
t1 = new std::thread([strongThis](){
RequestVoteResponse response;
while (strongThis->request_vote_stream->Read(&response)) {
debug("GRPC: Recv RequestVoteResponse from %s, me %s\n", response.name().c_str(), strongThis->raft_node->name.c_str());
...
}
});
...
Чтобы инициализировать 3 потока, я должен написать конструктор, как здесь, я использую 3 ClientContext
здесь, потому что документ говорит один ClientContext для одного RPC
struct RaftMessagesStreamClientSync : std::enable_shared_from_this<RaftMessagesStreamClientSync>{
typedef grpc::ClientReaderWriter<RequestVoteRequest, RequestVoteResponse> CR;
typedef grpc::ClientReaderWriter<AppendEntriesRequest, AppendEntriesResponse> CA;
typedef grpc::ClientReaderWriter<InstallSnapshotRequest, InstallSnapshotResponse> CI;
std::unique_ptr<CR> request_vote_stream;
std::unique_ptr<CA> append_entries_stream;
std::unique_ptr<CI> install_snapshot_stream;
ClientContext context_r;
ClientContext context_a;
ClientContext context_i;
std::thread * t1 = nullptr;
std::thread * t2 = nullptr;
std::thread * t3 = nullptr;
...
}
RaftMessagesStreamClientSync::RaftMessagesStreamClientSync(const char * addr, struct RaftNode * _raft_node) : raft_node(_raft_node), peer_name(addr) {
std::shared_ptr<Channel> channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
stub = raft_messages::RaftStreamMessages::NewStub(channel);
// 1
request_vote_stream = stub->RequestVote(&context_r);
// 2
append_entries_stream = stub->AppendEntries(&context_a);
// 3
install_snapshot_stream = stub->InstallSnapshot(&context_i);
}
~RaftMessagesStreamClientSync() {
raft_node = nullptr;
t1->join();
t2->join();
t3->join();
delete t1;
delete t2;
delete t3;
}
Затем я реализую на стороне сервера
Status RaftMessagesStreamServiceImpl::RequestVote(ServerContext* context, ::grpc::ServerReaderWriter< ::raft_messages::RequestVoteResponse, RequestVoteRequest>* stream){
RequestVoteResponse response;
RequestVoteRequest request;
while (stream->Read(&request)) {
...
}
return Status::OK;
}
Затем возникают 2 проблемы:
- Когда я тестируюс 3 узлами, которые фактически создают 2
RaftMessagesStreamServiceImpl
для каждого узла, оператор от 1 до 3 требует много времени для выполнения. - Нет RPC, полученного со стороны сервера.Существуют похожие проблемы при использовании Bidi Aysnc Server , однако я не могу понять, как этот пост может мне помочь.
ОБНОВЛЕНИЕ
После некоторой отладки я обнаружил, что request_vote_stream->Write(request)
возвращает 0, что, согласно документу , означает, что поток закрыт .Однако почему он закрыт?