Пример для потребителя Google Pub / Sub в C ++ - PullRequest
1 голос
/ 22 мая 2019

Я пытаюсь поиграть в Google Pub / Sub, и мне нужно интегрировать его в базу кода C ++.

Поскольку в C ++ отсутствует встроенная поддержка Google Pub/Sub, я использую его через gRPC. Таким образом, я сгенерировал соответствующие файлы pubsub.grpc.pb.h, pubsub.grpc.pb.cc, pubsub.pb.h и pubsub.pb.cc через protoc.

Вопрос: из-за отсутствия документации было бы очень полезно иметь пример на C ++. Я нашел пример для издателя, но не для подписчика. Я попытался погрузиться в сгенерированный код и примеры на других языках, но возникает много вопросов. Есть ли пример для абонентской части? Или, может быть, у кого-то уже был такой опыт?

1 Ответ

2 голосов
/ 24 мая 2019

Так же, как вы делаете запросы на публикацию, вы можете делать StreamingPull запросов на сообщения. Обратите внимание, что это простое доказательство концепции, и на практике вы, вероятно, захотите сделать этот код более надежным; например создавать несколько потоков, обрабатывать сообщения в пуле потоков, реализовывать управление потоком данных и т. д. *

#include <iostream>
#include <memory>

#include <grpc++/grpc++.h>

#include "google/pubsub/v1/pubsub.grpc.pb.h"

auto main() -> int {
    using grpc::ClientContext;
    using grpc::ClientReaderWriter;
    using google::pubsub::v1::Subscriber;
    using google::pubsub::v1::StreamingPullRequest;
    using google::pubsub::v1::StreamingPullResponse;

    auto creds = grpc::GoogleDefaultCredentials();
    auto stub = std::make_unique<Subscriber::Stub>(
        grpc::CreateChannel("pubsub.googleapis.com", creds));

    // Open up the stream.
    ClientContext context;
    std::unique_ptr<ClientReaderWriter<
        StreamingPullRequest, StreamingPullResponse>> stream(
            stub->StreamingPull(&context));

    // Send initial message.
    StreamingPullRequest request;
    request.set_subscription(
        "projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
    request.set_stream_ack_deadline_seconds(10);
    stream->Write(request);

    // Receive messages.
    StreamingPullResponse response;
    while (stream->Read(&response)) {
      // Ack messages.
      StreamingPullRequest ack_request;
      for (const auto &message : response.received_messages()) {
        ack_request.add_ack_ids(message.ack_id());
      }
      stream->Write(ack_request);
    }
}

Это новейший API Cloud Pub / Sub, и в настоящее время рекомендуемый способ получения сообщений из службы; это особенно верно для пользователей, которые ожидают высокую пропускную способность и низкую задержку. В настоящее время не существует клиентской библиотеки для C ++, но для нее существует открытая проблема на GitHub. Существующие клиентские библиотеки для других языков (например, Java) уже используют этот API, поэтому вы можете скопировать их функциональность в свой собственный код C ++.

Для более простых сценариев использования вы также можете использовать более старый Pull API, который выполняет много независимых запросов на сообщения. Обратите внимание, что для высокой пропускной способности и низкой задержки вы, скорее всего, должны сделать много одновременных асинхронных RPC: см. GRPC документация .

#include <iostream>
#include <memory>

#include <grpc++/grpc++.h>

#include "google/pubsub/v1/pubsub.grpc.pb.h"

auto main() -> int {
    using grpc::ClientContext;
    using google::pubsub::v1::Subscriber;
    using google::pubsub::v1::PullRequest;
    using google::pubsub::v1::PullResponse;

    auto creds = grpc::GoogleDefaultCredentials();
    auto stub = std::make_unique<Subscriber::Stub>(
        grpc::CreateChannel("pubsub.googleapis.com", creds));

    PullRequest request;
    request.set_subscription(
        "projects/pubsub-cpp-api-1504713535863/subscriptions/testing");
    request.set_max_messages(50);
    request.set_return_immediately(false);

    PullResponse response;
    ClientContext ctx;

    auto status = stub->Pull(&ctx, request, &response);
    if (!status.ok()) {
        // ...
    }

    // Do something with "response".
}

В качестве последнего средства вы можете использовать подписку Push , которая потребует только реализации конечной точки HTTP на вашем клиенте. Однако это обычно не рекомендуется, если вы не используете несколько подписок или если ваш клиент не может отправлять исходящие запросы.

...