Apache Beam DataflowRunner не может записать в AWS S3 - PullRequest
0 голосов
/ 28 февраля 2020

В настоящее время мы разрабатываем Apache Beam конвейер для чтения данных из GCP Pub / Sub и записи полученных данных в корзину в AWS S3.

Мы используем TextIO.write в beam-sdks-java-io.amazon-web-services для записи в S3.

TextIO.write()
        .withWindowedWrites()
        .withNumShards(options.getNumShards)
        .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getTempLocation))
        .to(FileBasedSink.convertToFileResourceIfPossible(options.getOutputDirectory))

Итак, сначала мы протестировали этот конвейер локально, используя DirectRunner, и он работал нормально. (Данные, поступающие из Pub / Sub, были получены конвейером и записаны в S3.

options.setRunner(classOf[DirectRunner])
options.setStagingLocation("./outputFolder/staging")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")

В последней части мы хотели запустить этот конвейер с использованием обработчика потока данных без каких-либо изменений кода, поэтому мы изменили код на используйте DataflowRunner

options.setRunner(classOf[DataflowRunner])
options.setStagingLocation("gs://my-gcs-bucket/binaries")
options.setGcpTempLocation("gs://my-gcs-bucket/temp")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")

С этим параметром данные принимаются конвейером из pub / sub, но не записываются в S3. Также нет ошибок, записываемых в журналы потока данных в StackDriver.

Кто-нибудь знает, в чем может быть проблема? Неправильная ли конфигурация параметров конвейера? Или запись в S3 завершается с ошибкой?

У кого-нибудь есть предложения по настройке журналов в beam-sdks-java-io.amazon-web-services вывести протокол уровня DEBUG?

Спасибо!

1 Ответ

1 голос
/ 28 февраля 2020

Чтобы выполнить конвейер, используя DataflowRunner, вы должны установить следующие поля в PipelineOptions:

  • project - идентификатор вашего проекта Google Cloud
  • runner - Бегунок конвейера, который проанализирует вашу программу и создаст ваш конвейер.
  • gcpTempLocation - Путь к облачному хранилищу для Dataflow для создания любого временного файлы. Вы должны создать этот сегмент заранее, перед запуском конвейера.
  • stagingLocation - Область хранения в облаке для Dataflow для размещения ваших двоичных файлов. Если вы не установите этот параметр, то, что вы указали для tempLocation, будет использовано и для места размещения.

Вам необходимо указать параметр project, например, options.setProject("my-project-id");.

Одна важная вещь: если вы используете Apache Beam SDK для Java 2.15.0 или новее, вы также должны указать опцию region . Пожалуйста, обратитесь к официальной документации для получения дополнительной информации.

Надеюсь, это поможет.

...