Я работаю над проектом C ++, в котором используется Google Pub / Sub.
Поскольку встроенная поддержка Google Pub / Sub в C ++ отсутствует, я использую его через gRPC. Таким образом, я создал с помощью protoc соответствующие файлы pubsub.grpc.pb.h, pubsub.grpc.pb.cc, pubsub.pb.h и pubsub.pb.cc.
Я написал облегченный класс-оболочку, для управления подпиской. Класс в основном создает новый поток и начинает прослушивать новые сообщения. Вот пример кода (код был построен на основе этого вопроса):
class Consumer
{
public:
Consumer();
~Consumer();
void startConsume();
// ...
std::string m_subscriptionName;
std::unique_ptr<std::thread> m_thread;
std::shared_ptr<grpc::Channel> m_channel;
std::unique_ptr<google::pubsub::v1::Subscriber::Stub> m_stub;
std::atomic<bool> m_runThread;
};
Consumer::Consumer()
{
m_channel = grpc::CreateChannel("pubsub.googleapis.com:443", grpc::GoogleDefaultCredentials());
m_stub = google::pubsub::v1::Subscriber::NewStub(m_channel);
m_subscriptionName = "something";
}
Consumer::~Consumer()
{
m_runThread = false;
if (m_thread && m_thread->joinable())
{
m_thread->join();
}
}
void Consumer::startConsume()
{
m_thread.reset(new std::thread([this]()
{
m_runThread = true;
while (m_runThread)
{
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest,
google::pubsub::v1::StreamingPullResponse>> stream(m_stub->StreamingPull(&context));
// send the initial message
google::pubsub::v1::StreamingPullRequest req;
req.set_subscription(m_subscriptionName);
req.set_stream_ack_deadline_seconds(10);
// if write passed successfully, start subscription
if (!stream->Write(req))
{
continue;
}
// receive messages
google::pubsub::v1::StreamingPullResponse response;
while (stream->Read(&response))
{
google::pubsub::v1::StreamingPullRequest ack_request;
for (const auto& message : response.received_messages())
{
// process messages ...
ack_request.add_ack_ids(message.ack_id());
}
stream->Write(ack_request);
}
}
}));
}
В процессе создается несколько экземпляров класса Consumer
.
Кажется, работает отлично. Однако иногда программа застревает на stream->Read(&response)
коде. Отладка показала, что поток застрял внутри вызова функции Read()
- поток ничего не читает и тоже не выходит из функции, несмотря на то, что буфер Pub / Sub не пуст. После перезапуска приложения все сообщения успешно читаются. Это похоже на тупик внутри Read()
.
Есть ли что-то, что я делаю неправильно? Что может вызвать такое поведение?