Тупик внутри функции gRPC Read () - PullRequest
0 голосов
/ 26 октября 2019

Я работаю над проектом C ++, в котором используется Google Pub / Sub.

Поскольку встроенная поддержка Google Pub / Sub в C ++ отсутствует, я использую его через gRPC. Таким образом, я создал с помощью protoc соответствующие файлы pubsub.grpc.pb.h, pubsub.grpc.pb.cc, pubsub.pb.h и pubsub.pb.cc.

Я написал облегченный класс-оболочку, для управления подпиской. Класс в основном создает новый поток и начинает прослушивать новые сообщения. Вот пример кода (код был построен на основе этого вопроса):

class Consumer 
{
public:
    Consumer();
    ~Consumer();
    void startConsume();
// ...
    std::string m_subscriptionName;
    std::unique_ptr<std::thread> m_thread;
    std::shared_ptr<grpc::Channel> m_channel;
    std::unique_ptr<google::pubsub::v1::Subscriber::Stub> m_stub;
    std::atomic<bool> m_runThread;
};

Consumer::Consumer()
{
    m_channel = grpc::CreateChannel("pubsub.googleapis.com:443", grpc::GoogleDefaultCredentials());
    m_stub = google::pubsub::v1::Subscriber::NewStub(m_channel);
    m_subscriptionName = "something";
}

Consumer::~Consumer()
{
    m_runThread = false;
    if (m_thread && m_thread->joinable())
    {
        m_thread->join();
    }
}

void Consumer::startConsume()
{
    m_thread.reset(new std::thread([this]()
    {
        m_runThread = true;
        while (m_runThread)
        {
            grpc::ClientContext context;
            std::unique_ptr<grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest, 
                                                     google::pubsub::v1::StreamingPullResponse>> stream(m_stub->StreamingPull(&context));
            // send the initial message
            google::pubsub::v1::StreamingPullRequest req;
            req.set_subscription(m_subscriptionName);
            req.set_stream_ack_deadline_seconds(10);

            // if write passed successfully, start subscription
            if (!stream->Write(req))
            {
                continue;
            }

            // receive messages
            google::pubsub::v1::StreamingPullResponse response;
            while (stream->Read(&response))
            {
                google::pubsub::v1::StreamingPullRequest ack_request;
                for (const auto& message : response.received_messages())
                {
                    // process messages ...
                    ack_request.add_ack_ids(message.ack_id());
                }
                stream->Write(ack_request);
            }
        }
    }));
}

В процессе создается несколько экземпляров класса Consumer.

Кажется, работает отлично. Однако иногда программа застревает на stream->Read(&response) коде. Отладка показала, что поток застрял внутри вызова функции Read() - поток ничего не читает и тоже не выходит из функции, несмотря на то, что буфер Pub / Sub не пуст. После перезапуска приложения все сообщения успешно читаются. Это похоже на тупик внутри Read().

Есть ли что-то, что я делаю неправильно? Что может вызвать такое поведение?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...