Локальный эмулятор Pubsub не будет работать с потоком данных - PullRequest
0 голосов
/ 10 апреля 2020

Я разрабатываю поток данных в Java, входные данные поступают из Pubsub. Позже я увидел руководство здесь о том, как использовать локальный эмулятор Pubsub, поэтому мне не нужно развертываться в GCP для тестирования.

Вот мой простой код:

private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {

    @Description("Pub/Sub topic to read messages from")
    String getTopic();
    void setTopic(String topic);

    @Description("Pub/Sub subscription to read messages from")
    String getSubscription();
    void setSubscription(String subscription);

    @Description("Local file output")
    String getOutput();
    void setOutput(String output);
}

public static void main(String[] args) {

    Options options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(Options.class);
    options.setStreaming(true);
    options.setPubsubRootUrl("localhost:8085");

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // other .apply's

    pipeline.run();

}

Я смог следовать руководству, включая ту часть, где мне нужно использовать пример кода Python для создания сообщений topi c, подписки, публикации и даже публикации sh. Когда я использую код Python для взаимодействия с эмулятором Pubsub, я вижу сообщение Detected HTTP/2 connection в командной строке, где я запускаю эмулятор:

Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.

Я скомпилировал / запустил код в Eclipse с использованием конфигурации запуска потока данных, но у меня возникает проблема.

enter image description here enter image description here enter image description here

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription: 
...
Caused by: java.lang.RuntimeException: Failed to create subscription: 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost

Когда я пытаюсь добавить http в строку options.setPubsubRootUrl("localhost:8085"), я получаю бесконечно повторяющееся исключение:

com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    at java.net.PlainSocketImpl.connect(Unknown Source)
    at java.net.SocksSocketImpl.connect(Unknown Source)

Кажется, он достигает эмулятора Pubsub, но не может подключиться как команда -линии, где я запускаю эмулятор, также генерирует это бесконечно:

[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.

Как я могу заставить мой поток данных работать с эмулятором Pubsub?

1 Ответ

0 голосов
/ 10 апреля 2020

Вы пытаетесь подключиться к эмулятору Pubsub из Beam Direct Runner, используя ветвь потока данных SDK Beam 2.5. SDK Dataflow 2.5 и плагин Eclipse устарели по состоянию на 6 июня 2019 г. Однако это должно работать.

Вам нужно добавить префикс PubsubRootUrl в Beam, как вы обнаружили, с помощью http: //. Вторая проблема, которую вы видите, указывает на то, что ничего не слушается на localhost:8085. Это, вероятно, потому что на самом деле есть 2 локальных хоста: IPv4 и IPv6. Эмулятор Pubsub только прослушивает IPv4, а Windows сначала пытается использовать IPv6. Попробуйте заменить localhost на 127.0.0.1 для принудительной установки IPv4. Вы должны в конечном итоге это:

options.setPubsubRootUrl("http://127.0.0.1:8085")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...