Apache Beam: не может получить доступ к Pub / Sub Emulator через docker-compose - PullRequest
0 голосов
/ 08 марта 2019

Я создал программный продукт, использующий GCP Pub / Sub в качестве очереди сообщений, Apache Beam для построения конвейера и Flask для создания веб-сервера. Он работает бесперебойно в производственном процессе, но у меня проблемы с тем, чтобы соединить все компоненты вместе с docker-compose, в частности с конвейером Apache Beam.

Я следовал Конвейер потока данных и эмулятор pubsub , чтобы заставить конвейер прослушивать эмулятор Pub / Sub GCP, заменив localhost из ответа SO именем службы, определенной в моем docker-compose.yaml:

  pubsub_emulator:
    build: docker_images/message_queue
    ports:
      - 8085:8085

  webserver:
    build: docker_images/webserver
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    ports:
      - 8899:8080
    depends_on:
      - pubsub_emulator

   pipeline:
    build: docker_images/pipeline
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    depends_on:
      - pubsub_emulator

Веб-сервер может получить доступ к эмулятору Pub / Sub и создавать темы.

Однако конвейер не запускается при запуске с MalformedURLException:

Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: no protocol: pubsub_emulator:8085/v1/projects/my-dev/subscriptions/sync_beam_1702190853678138166

Параметры конвейера кажутся хорошими, я определил их с помощью:

final String pubSubEmulatorHost = System.getenv("PUBSUB_EMULATOR_HOST"); 

BasePipeline.PipeOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                                .as(BasePipeline.PipeOptions.class);

options.as(DataflowPipelineOptions.class).setStreaming(true);

options.as(PubsubOptions.class).setPubsubRootUrl(pubSubEmulatorHost);

Pipeline pipeline = Pipeline.create(options);

Кто-нибудь получает подсказку о том, что происходит и как это решить? Единственное ли решение подразумевает установку эмулятора и конвейера в одном докере?

1 Ответ

1 голос
/ 08 марта 2019

Вы можете попытаться изменить значение на следующее:

http://pubsub_emulator:8085

Как ошибка, сообщающая о пропаже protocol, которая, как ожидается, будет http в вашем случае

Согласно Apache Beam SDK ожидается, что значением будет полный URL:

// getPubsubRootUrl
@Default.String(value="https://pubsub.googleapis.com")
 @Hidden
java.lang.String getPubsubRootUrl()
// Root URL for use with the Google Cloud Pub/Sub API.

Однако, если вы пришли из Python, вы заметите, что Python SDK , который использует gRPC Python , как показано в здесь , ожидая только адрес сервера, который состоят из адреса и порта

# A snippet from google-cloud-python library.
if os.environ.get("PUBSUB_EMULATOR_HOST"):
    kwargs["channel"] = grpc.insecure_channel(
        target=os.environ.get("PUBSUB_EMULATOR_HOST")
    )
grpc.insecure_channel(target, options=None)
Creates an insecure Channel to a server.

The returned Channel is thread-safe.

Parameters: 
target – The server address
...