Как я могу читать и писать из потока GRPC одновременно - PullRequest
0 голосов
/ 01 февраля 2019

Я сейчас реализую алгоритм Рафта, и я хочу использовать поток gRPC для этого.Моя основная идея состоит в том, чтобы создать 3 потока для каждого узла для всех остальных узлов, один поток будет передавать RPC одного типа, есть AppendEntries, RequestVote и InstallSnapshot.Я пишу некоторый код с ограниченной справкой из route_guide , потому что в своей демонстрации двунаправленного потока RouteChat клиент отправляет все свои данные, прежде чем он начнет читать.

Во-первых, я хочу записать в поток в любое время, поэтому я пишу следующие коды

void RaftMessagesStreamClientSync::AsyncRequestVote(const RequestVoteRequest& request){
    std::string peer_name = this->peer_name;
    debug("GRPC: Send RequestVoteRequest from %s to %s\n", request.name().c_str(), peer_name.c_str());
    request_vote_stream->Write(request);
}

Между тем, я хочу, чтобы поток продолжал читать из потока, как следующие коды, который вызывается сразу после создания RaftMessagesStreamClientSync.

void RaftMessagesStreamClientSync::handle_response(){
    // strongThis is a must 
    auto strongThis = shared_from_this();
    t1 = new std::thread([strongThis](){
        RequestVoteResponse response;
        while (strongThis->request_vote_stream->Read(&response)) {
            debug("GRPC: Recv RequestVoteResponse from %s, me %s\n", response.name().c_str(), strongThis->raft_node->name.c_str());
            ...
        }
    });
    ...

Чтобы инициализировать 3 потока, я должен написать конструктор, как здесь, я использую 3 ClientContext здесь, потому что документ говорит один ClientContext для одного RPC

struct RaftMessagesStreamClientSync : std::enable_shared_from_this<RaftMessagesStreamClientSync>{
    typedef grpc::ClientReaderWriter<RequestVoteRequest, RequestVoteResponse> CR;
    typedef grpc::ClientReaderWriter<AppendEntriesRequest, AppendEntriesResponse> CA;
    typedef grpc::ClientReaderWriter<InstallSnapshotRequest, InstallSnapshotResponse> CI;

    std::unique_ptr<CR> request_vote_stream;
    std::unique_ptr<CA> append_entries_stream;
    std::unique_ptr<CI> install_snapshot_stream;
    ClientContext context_r;
    ClientContext context_a;
    ClientContext context_i;
    std::thread * t1 = nullptr;
    std::thread * t2 = nullptr;
    std::thread * t3 = nullptr;
    ...
}
RaftMessagesStreamClientSync::RaftMessagesStreamClientSync(const char * addr, struct RaftNode * _raft_node) : raft_node(_raft_node), peer_name(addr) {
    std::shared_ptr<Channel> channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
    stub = raft_messages::RaftStreamMessages::NewStub(channel);
    // 1
    request_vote_stream = stub->RequestVote(&context_r);
    // 2
    append_entries_stream = stub->AppendEntries(&context_a);
    // 3
    install_snapshot_stream = stub->InstallSnapshot(&context_i);    
}
~RaftMessagesStreamClientSync() {
    raft_node = nullptr;
    t1->join();
    t2->join();
    t3->join();
    delete t1;
    delete t2;
    delete t3;
}

Затем я реализую на стороне сервера

Status RaftMessagesStreamServiceImpl::RequestVote(ServerContext* context, ::grpc::ServerReaderWriter< ::raft_messages::RequestVoteResponse, RequestVoteRequest>* stream){
    RequestVoteResponse response;
    RequestVoteRequest request;
    while (stream->Read(&request)) {
        ...
    }

    return Status::OK;
}

Затем возникают 2 проблемы:

  1. Когда я тестируюс 3 узлами, которые фактически создают 2 RaftMessagesStreamServiceImpl для каждого узла, оператор от 1 до 3 требует много времени для выполнения.
  2. Нет RPC, полученного со стороны сервера.Существуют похожие проблемы при использовании Bidi Aysnc Server , однако я не могу понять, как этот пост может мне помочь.

ОБНОВЛЕНИЕ

После некоторой отладки я обнаружил, что request_vote_stream->Write(request) возвращает 0, что, согласно документу , означает, что поток закрыт .Однако почему он закрыт?

1 Ответ

0 голосов
/ 01 февраля 2019

После некоторой отладки я обнаружил, что все две проблемы связаны с одной проблемой, которую я создаю клиенту перед тем, как создать сервер.

Поскольку я изначально использую унарные вызовы RPC, предыдущий вызов от клиента вызывает только код ошибки gRPC 14. Программа продолжается, поскольку каждый вызов, отправленный после создания сервера, может быть обработан правильно.

Однако, когда дело доходит до потоковых вызовов, stub->RequestVote(&context_r) в конечном итоге вызовет функцию блокировки ClientReaderWriter::ClientReaderWriter, которая попытается подключиться к серверу, который не создан сейчас.

/// Block to create a stream and write the initial metadata and \a request
/// out. Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
ClientReaderWriter(::grpc::ChannelInterface* channel,
                 const ::grpc::internal::RpcMethod& method,
                 ClientContext* context)
  : context_(context),
    cq_(grpc_completion_queue_attributes{
        GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
        GRPC_CQ_DEFAULT_POLLING}),  // Pluckable cq
    call_(channel->CreateCall(method, context, &cq_)) {
if (!context_->initial_metadata_corked_) {
  ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
      ops;
  ops.SendInitialMetadata(context->send_initial_metadata_,
                          context->initial_metadata_flags());
  call_.PerformOps(&ops);
  cq_.Pluck(&ops);
}
}

КакКак следствие, соединение еще не установлено.

...