Невозможно прочитать сообщения Pub / Sub из локального эмулятора в луче Apache - PullRequest
0 голосов
/ 29 апреля 2019

Я пытаюсь запустить простой конвейер Apache Beam с DirectRunner, который читает из подписки Pub / Sub и записывает сообщения на диск.

Конвейер работает нормально, когда я запускаю его на GCP, однако, когда я пытаюсь запустить его на своем локальном эмуляторе Pub / Sub, он, похоже, ничего не делает.

Я использую пользовательский класс Options, который расширяет класс org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions.

public interface Options extends PubsubOptions {

  @Description("Pub/Sub subscription to read the input from")
  @Required
  ValueProvider<String> getInputSubscription();

  void setInputSubscription(ValueProvider<String> valueProvider);

}

конвейер довольно прост

pipeline
        .apply("Read Pub/Sub Messages", PubsubIO.readMessagesWithAttributes()
        .fromSubscription(options.getInputSubscription()))

        .apply("Add a fixed window", Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_SIZE))))

        .apply("Convert Pub/Sub To String", new PubSubMessageToString())

        .apply("Write Pub/Sub messages to local disk", new WriteOneFilePerWindow());

Конвейер выполнен со следующими параметрами

mvn compile exec:java \
-Dexec.mainClass=DefaultPipeline \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=my-project \
--inputSubscription=projects/my-project/subscriptions/my-subscription \
--pubsubRootUrl=http://127.0.0.1:8681 \
--runner=DirectRunner"

Я использую этот образ докера эмулятора Pub / Sub и выполняю его с помощью следующей команды:

docker run --rm -ti -p 8681:8681 -e PUBSUB_PROJECT1=my-project,topic:my-subscription marcelcorso/gcloud-pubsub-emulator:latest

Требуется ли дополнительная настройка для этой работы?

Ответы [ 2 ]

1 голос
/ 01 мая 2019

Оказывается, что конвейер Apache Beam не может читать из локального эмулятора Pub / Sub, если у вас установлена ​​переменная окружения GOOGLE_APPLICATION_CREDENTIALS.

Как только я удалил эту переменную окружения, которая указывала на службу GCPучетная запись, конвейер работал без проблем с локальным эмулятором Pub / Sub.

0 голосов
/ 30 апреля 2019

Вы можете устранить неполадки локального эмулятора, выполнив к нему ручные HTTP-запросы (через curl), например:

$ curl -d '{"messages": [{"data": "c3Vwc3VwCg=="}]}' -H "Content-Type: application/json" -X POST localhost:8681/v1/projects/my-project/topics/topic:publish
{
  "messageIds": ["5"]
}
$ 

$ curl -d '{"returnImmediately":true, "maxMessages":1}' -H "Content-Type: application/json" -X POST localhost:8681/v1/projects/my-project/subscriptions/my-subscription:pull
{
  "receivedMessages": [{
    "ackId": "projects/my-project/subscriptions/my-subscription:9",
    "message": {
      "data": "c3Vwc3VwCg==",
      "messageId": "5",
      "publishTime": "2019-04-30T17:26:09Z"
    }
  }]
}
$

или указав на него инструмент командной строки gcloud:

$ CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB=localhost:8681 gcloud pubsub topics list

Также обратите внимание, что при запуске эмулятора он создает тему и подписку с нуля, поэтому на них нет сообщений.Если ваш конвейер ожидает немедленного извлечения сообщений в подписке, это объясняет, почему он «застрял».Обратите внимание, что когда вы запускаете конвейер в GCP, тема и подписка, которые вы там используете, могут уже содержать сообщения.

...