Я строю сервер grpc с c ++ и обнаруживаю, что его память не будет освобождена после нескольких запросов. память увеличивается сначала, и если я продолжаю посылать запросы, память остается на пиковом значении. После того, как я перестану отправлять запросы, память не будет освобождена или мало памяти освободится. что не так с моим кодом, память должна быть скоро освобождена или сохранена как буфер перехвата?
class BaseCallData {
public:
BaseCallData(XFRProcessor *processor)
: processor_(processor), status_(CallStatus::CREATE) {}
virtual ~BaseCallData() = default;
void Proceed() {
if (status_ == CallStatus::CREATE) {
status_ = CallStatus::PROCESS;
Request();
} else if (status_ == CallStatus::PROCESS) {
NewCallData();
//TODO
OnRequest();
Response();
status_ = CallStatus::FINISH;
} else if (status_ == CallStatus::FINISH) {
delete this;
} else {
LOGGER_ERROR(Log::GetLog(), "wrong grpc status");
}
}
template<class RpcReq, class RpcRes, class ReqData, class RspData>
void WorkFlow(RpcReq &grpc_request,
RpcRes &grpc_response,
ServerContext &ctx,
ReqData &request_data,
RspData &response_data) {
ErrorCode error_code = RequestReader::Instance()->ReadRequest(grpc_request, ctx, request_data);
ReqTimer req_timer(request_data.log_id_, request_data.request_type_);
if (error_code == OK) {
error_code = processor_->Proceed(&request_data, &response_data, &req_timer);
response_data.error_code = error_code;
grpc_response = ResponseAssigner::Instance()->AssignResponse(response_data);
if (OK == error_code) {
req_timer.SetStatus(true);
} else {
LOGGER_WARN(Log::GetLog(),
"[REQ:{}][LOG:{}] fail to run, err[{}]",
request_data.GetRequestType(),
request_data.GetLogId(),
error_code);
req_timer.SetStatus(false, std::to_string(error_code));
}
} else {
LOGGER_WARN(Log::GetLog(), "fail to read request, err[{}]", error_code);
req_timer.SetStatus(false, std::to_string(error_code));
grpc_response.set_error_code(error_code);
grpc_response.set_error_msg(GetErrorMsg(error_code));
}
}
XFRProcessor *GetProcessor() {
return processor_;
}
private:
virtual void NewCallData() = 0;
virtual void Request() = 0;
virtual void Response() = 0;
virtual void OnRequest() = 0;
enum class CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
XFRProcessor *processor_;
};
class DetectCallData : public BaseCallData {
public:
DetectCallData(::xfr::XFRService::AsyncService *service, ServerCompletionQueue *cq, XFRProcessor *processor)
: BaseCallData(processor), p_service_(service), p_cq_(cq), responder_(&ctx_) {
Proceed();
}
void NewCallData() override {
new DetectCallData(p_service_, p_cq_, GetProcessor());
}
void Request() override {
p_service_->RequestDetect(&ctx_, &request_, &responder_, p_cq_, p_cq_, this);
}
void Response() override {
responder_.Finish(response_, Status::OK, this);
}
void OnRequest() override {
WorkFlow(request_, response_, ctx_, request_data_, response_data_);
}
private:
ServerContext ctx_;
::xfr::XFRService::AsyncService *p_service_;
ServerCompletionQueue *p_cq_;
::xfr::DetectRequest request_;
::xfr::DetectResponse response_;
DetectRequest request_data_;
DetectResponse response_data_;
ServerAsyncResponseWriter<::xfr::DetectResponse> responder_;
};
class CompareCallData : public BaseCallData {
...
};
class MatchCallData : public BaseCallData {
...
};
class XFRServer final {
public:
XFRServer(const XFRServer &) = delete;
XFRServer &operator=(const XFRServer &) = delete;
XFRServer() {
Init();
builder_.AddListeningPort(address_, InsecureServerCredentials());
builder_.RegisterService(&service_);
builder_.SetMaxReceiveMessageSize(max_receive_size_);
builder_.SetMaxSendMessageSize(max_send_size_);
for (int i = 0; i < thread_num_; ++i) {
auto p_cq = builder_.AddCompletionQueue();
v_cq_.push_back(std::move(p_cq));
}
}
void Init() {
auto grpc_config = hobot::vision::xfr::ServerConfig::GetConfig()->GetSubConfig("grpc");
address_ = grpc_config->GetSTDStringValue("server_address");
if (address_.empty()) {
LOGGER_ERROR(Log::GetLog(), "fail to get server address: {}", address_);
exit(0);
}
thread_num_ = grpc_config->GetIntValue("server_thread_count", 300);
max_receive_size_ = grpc_config->GetIntValue("max_receive_message_bytes", 20971520);
max_send_size_ = grpc_config->GetIntValue("max_send_message_bytes", 20971520);
}
~XFRServer() {
server_->Shutdown();
//document shows that cq should always shutdown after server
for (auto &cq : v_cq_) {
cq->Shutdown();
}
}
void HandleRpcs(ServerCompletionQueue *cq) {
auto detect_processor = std::make_shared<DetectProcessor>();
auto compare_processor = std::make_shared<CompareProcessor>();
auto match_processor = std::make_shared<MatchProcessor>();
new DetectCallData(&service_, cq, detect_processor.get());
new CompareCallData(&service_, cq, compare_processor.get());
new MatchCallData(&service_, cq, match_processor.get());
void *tag{nullptr};
bool ok{false};
while (true) {
if (!cq->Next(&tag, &ok)) {
LOGGER_WARN(Log::GetLog(), "Server stream closed, quiting");
break;
}
if (ok) {
static_cast<BaseCallData *>(tag)->Proceed();
}
}
}
void run() {
server_ = builder_.BuildAndStart();
for (auto &cq:v_cq_) {
v_threads_.emplace_back(
std::thread([this, &cq] {
HandleRpcs(cq.get());
})
);
}
LOGGER_INFO(Log::GetLog(), "grpc server start working...");
v_threads_.emplace_back(
std::thread([&zk_register] {
zk_register.KeepPublished();
})
);
for (auto &t: v_threads_) {
t.join();
}
}
private:
std::string address_;
int thread_num_;
std::vector<std::thread> v_threads_;
std::vector<std::unique_ptr<ServerCompletionQueue>> v_cq_;
::xfr::XFRService::AsyncService service_;
std::unique_ptr<Server> server_;
ServerBuilder builder_;
int max_receive_size_;
int max_send_size_;
};